Skip to content

Commit 23c22ca

Browse files
committed
removed useless code
1 parent 9eff90d commit 23c22ca

File tree

20 files changed

+82
-113
lines changed

20 files changed

+82
-113
lines changed

packages/service-library/src/servicelib/background_task.py

Lines changed: 31 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,14 @@
33
import datetime
44
import functools
55
import logging
6-
from collections.abc import AsyncIterator, Awaitable, Callable
7-
from typing import Any, Coroutine, Final, ParamSpec, TypeVar
6+
from collections.abc import AsyncIterator, Awaitable, Callable, Coroutine
7+
from typing import Any, Final, ParamSpec, TypeVar
88

99
from tenacity import TryAgain, before_sleep_log, retry, retry_if_exception_type
10-
from tenacity.asyncio import AsyncRetrying
1110
from tenacity.wait import wait_fixed
1211

1312
from .async_utils import retried_cancel_task, with_delay
14-
from .logging_utils import log_catch, log_context
13+
from .logging_utils import log_context
1514

1615
_logger = logging.getLogger(__name__)
1716

@@ -43,6 +42,21 @@ def periodic(
4342
) -> Callable[
4443
[Callable[P, Coroutine[Any, Any, None]]], Callable[P, Coroutine[Any, Any, None]]
4544
]:
45+
"""Calls the function periodically with a given interval.
46+
47+
Arguments:
48+
interval -- the interval between calls
49+
50+
Keyword Arguments:
51+
raise_on_error -- If False the function will be retried indefinitely unless cancelled.
52+
If True the function will be retried indefinitely unless cancelled
53+
or an exception is raised. (default: {False})
54+
early_wake_up_event -- allows to awaken the function before the interval has passed. (default: {None})
55+
56+
Returns:
57+
_description_
58+
"""
59+
4660
def _decorator(
4761
func: Callable[P, Coroutine[Any, Any, None]],
4862
) -> Callable[P, Coroutine[Any, Any, None]]:
@@ -71,50 +85,6 @@ async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> None:
7185
return _decorator
7286

7387

74-
async def _periodic_scheduled_task(
75-
task: Callable[..., Awaitable[None]],
76-
*,
77-
interval: datetime.timedelta,
78-
task_name: str,
79-
raise_on_error: bool,
80-
early_wake_up_event: asyncio.Event | None,
81-
**task_kwargs,
82-
) -> None:
83-
"""periodically runs task with a given interval.
84-
If raise_on_error is False, the task will be retried indefinitely unless cancelled.
85-
If raise_on_error is True, the task will be retried indefinitely unless cancelled or an exception is raised.
86-
If early_wake_up_event is set, the task might be woken up earlier than interval when the event is set.
87-
88-
Raises:
89-
task exception if raise_on_error is True
90-
"""
91-
nap = (
92-
asyncio.sleep
93-
if early_wake_up_event is None
94-
else SleepUsingAsyncioEvent(early_wake_up_event)
95-
)
96-
async for attempt in AsyncRetrying(
97-
sleep=nap,
98-
wait=wait_fixed(interval.total_seconds()),
99-
reraise=True,
100-
retry=retry_if_exception_type(TryAgain)
101-
if raise_on_error
102-
else retry_if_exception_type(),
103-
):
104-
with attempt:
105-
with (
106-
log_context(
107-
_logger,
108-
logging.DEBUG,
109-
msg=f"iteration {attempt.retry_state.attempt_number} of '{task_name}'",
110-
),
111-
log_catch(_logger),
112-
):
113-
await task(**task_kwargs)
114-
115-
raise TryAgain
116-
117-
11888
def create_periodic_task(
11989
task: Callable[..., Awaitable[None]],
12090
*,
@@ -125,34 +95,19 @@ def create_periodic_task(
12595
early_wake_up_event: asyncio.Event | None = None,
12696
**kwargs,
12797
) -> asyncio.Task:
128-
with log_context(
129-
_logger, logging.DEBUG, msg=f"create periodic background task '{task_name}'"
130-
):
131-
delayed_periodic_scheduled_task = with_delay(wait_before_running)(
132-
_periodic_scheduled_task
133-
)
134-
return asyncio.create_task(
135-
delayed_periodic_scheduled_task(
136-
task,
137-
interval=interval,
138-
task_name=task_name,
139-
raise_on_error=raise_on_error,
140-
early_wake_up_event=early_wake_up_event,
141-
**kwargs,
142-
),
143-
name=task_name,
144-
)
145-
98+
@with_delay(wait_before_running)
99+
@periodic(
100+
interval=interval,
101+
raise_on_error=raise_on_error,
102+
early_wake_up_event=early_wake_up_event,
103+
)
104+
async def _() -> None:
105+
await task(**kwargs)
146106

147-
async def stop_periodic_task(
148-
asyncio_task: asyncio.Task, *, timeout: float | None = None
149-
) -> None:
150107
with log_context(
151-
_logger,
152-
logging.DEBUG,
153-
msg=f"cancel periodic background task '{asyncio_task.get_name()}'",
108+
_logger, logging.DEBUG, msg=f"create periodic background task '{task_name}'"
154109
):
155-
await retried_cancel_task(asyncio_task, timeout=timeout)
110+
return asyncio.create_task(_(), name=task_name)
156111

157112

158113
@contextlib.asynccontextmanager
@@ -179,4 +134,6 @@ async def periodic_task(
179134
if asyncio_task is not None:
180135
# NOTE: this stopping is shielded to prevent the cancellation to propagate
181136
# into the stopping procedure
182-
await asyncio.shield(stop_periodic_task(asyncio_task, timeout=stop_timeout))
137+
await asyncio.shield(
138+
retried_cancel_task(asyncio_task, timeout=stop_timeout)
139+
)

packages/service-library/tests/redis/test_decorators.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import arrow
1212
import pytest
1313
from faker import Faker
14-
from servicelib.background_task import stop_periodic_task
14+
from servicelib.async_utils import retried_cancel_task
1515
from servicelib.redis import (
1616
CouldNotAcquireLockError,
1717
RedisClientSDK,
@@ -217,7 +217,7 @@ async def _assert_task_completes_once(
217217

218218
await _assert_on_sleep_done(sleep_events, stop_after=stop_after)
219219

220-
await stop_periodic_task(started_task, timeout=5)
220+
await retried_cancel_task(started_task, timeout=5)
221221

222222
events_timestamps: tuple[float, ...] = tuple(
223223
x.args[0].timestamp() for x in sleep_events.call_args_list

packages/service-library/tests/test_background_task.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@
1414
import pytest
1515
from faker import Faker
1616
from pytest_mock.plugin import MockerFixture
17+
from servicelib.async_utils import retried_cancel_task
1718
from servicelib.background_task import ( # Assuming the module is imported correctly
1819
create_periodic_task,
1920
periodic,
2021
periodic_task,
21-
stop_periodic_task,
2222
)
2323

2424
_FAST_POLL_INTERVAL: Final[int] = 1
@@ -73,7 +73,7 @@ async def _creator(
7373
yield _creator
7474
# cleanup
7575
await asyncio.gather(
76-
*(stop_periodic_task(t, timeout=stop_task_timeout) for t in created_tasks)
76+
*(retried_cancel_task(t, timeout=stop_task_timeout) for t in created_tasks)
7777
)
7878

7979

services/agent/src/simcore_service_agent/services/volumes_manager.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
from fastapi import FastAPI
1010
from models_library.projects_nodes_io import NodeID
1111
from pydantic import NonNegativeFloat
12-
from servicelib.background_task import create_periodic_task, stop_periodic_task
12+
from servicelib.async_utils import retried_cancel_task
13+
from servicelib.background_task import create_periodic_task
1314
from servicelib.fastapi.app_state import SingletonInAppStateMixin
1415
from servicelib.logging_utils import log_context
1516
from servicelib.rabbitmq.rpc_interfaces.agent.errors import (
@@ -60,10 +61,10 @@ async def shutdown(self) -> None:
6061
await self.docker.close()
6162

6263
if self._task_bookkeeping:
63-
await stop_periodic_task(self._task_bookkeeping)
64+
await retried_cancel_task(self._task_bookkeeping)
6465

6566
if self._task_periodic_volume_cleanup:
66-
await stop_periodic_task(self._task_periodic_volume_cleanup)
67+
await retried_cancel_task(self._task_periodic_volume_cleanup)
6768

6869
async def _bookkeeping_task(self) -> None:
6970
with log_context(_logger, logging.DEBUG, "volume bookkeeping"):

services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
from fastapi import FastAPI
77
from prometheus_client import CollectorRegistry, Gauge
88
from pydantic import PositiveInt
9-
from servicelib.background_task import create_periodic_task, stop_periodic_task
9+
from servicelib.async_utils import retried_cancel_task
10+
from servicelib.background_task import create_periodic_task
1011
from servicelib.fastapi.prometheus_instrumentation import (
1112
setup_prometheus_instrumentation as setup_rest_instrumentation,
1213
)
@@ -79,7 +80,7 @@ async def on_startup() -> None:
7980
async def on_shutdown() -> None:
8081
assert app.state.instrumentation_task # nosec
8182
with log_catch(_logger, reraise=False):
82-
await stop_periodic_task(app.state.instrumentation_task)
83+
await retried_cancel_task(app.state.instrumentation_task)
8384

8485
app.add_event_handler("startup", on_startup)
8586
app.add_event_handler("shutdown", on_shutdown)

services/api-server/src/simcore_service_api_server/core/health_checker.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
from models_library.rabbitmq_messages import LoggerRabbitMessage
1010
from models_library.users import UserID
1111
from pydantic import NonNegativeInt, PositiveFloat, PositiveInt
12-
from servicelib.background_task import create_periodic_task, stop_periodic_task
12+
from servicelib.async_utils import retried_cancel_task
13+
from servicelib.background_task import create_periodic_task
1314
from servicelib.fastapi.dependencies import get_app
1415
from servicelib.logging_utils import log_catch
1516
from servicelib.rabbitmq import RabbitMQClient
@@ -62,7 +63,7 @@ async def setup(self, health_check_task_period_seconds: PositiveFloat):
6263
async def teardown(self):
6364
if self._background_task:
6465
with log_catch(_logger, reraise=False):
65-
await stop_periodic_task(
66+
await retried_cancel_task(
6667
self._background_task, timeout=self._timeout_seconds
6768
)
6869
await self._log_distributor.deregister(job_id=self._dummy_job_id)

services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_task.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
from typing import Final
44

55
from fastapi import FastAPI
6-
from servicelib.background_task import create_periodic_task, stop_periodic_task
6+
from servicelib.async_utils import retried_cancel_task
7+
from servicelib.background_task import create_periodic_task
78
from servicelib.redis import exclusive
89

910
from ..core.settings import ApplicationSettings
@@ -43,7 +44,7 @@ async def _startup() -> None:
4344

4445
def on_app_shutdown(app: FastAPI) -> Callable[[], Awaitable[None]]:
4546
async def _stop() -> None:
46-
await stop_periodic_task(app.state.autoscaler_task)
47+
await retried_cancel_task(app.state.autoscaler_task)
4748

4849
return _stop
4950

services/autoscaling/src/simcore_service_autoscaling/modules/buffer_machines_pool_task.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
from typing import Final
44

55
from fastapi import FastAPI
6-
from servicelib.background_task import create_periodic_task, stop_periodic_task
6+
from servicelib.async_utils import retried_cancel_task
7+
from servicelib.background_task import create_periodic_task
78
from servicelib.redis import exclusive
89

910
from ..core.settings import ApplicationSettings
@@ -43,7 +44,7 @@ async def _startup() -> None:
4344
def on_app_shutdown(app: FastAPI) -> Callable[[], Awaitable[None]]:
4445
async def _stop() -> None:
4546
if hasattr(app.state, "buffers_pool_task"):
46-
await stop_periodic_task(app.state.buffers_pool_task)
47+
await retried_cancel_task(app.state.buffers_pool_task)
4748

4849
return _stop
4950

services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters_management_task.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
from collections.abc import Awaitable, Callable
44

55
from fastapi import FastAPI
6-
from servicelib.background_task import create_periodic_task, stop_periodic_task
6+
from servicelib.async_utils import retried_cancel_task
7+
from servicelib.background_task import create_periodic_task
78
from servicelib.redis import exclusive
89

910
from .._meta import APP_NAME
@@ -36,7 +37,7 @@ async def _startup() -> None:
3637

3738
def on_app_shutdown(app: FastAPI) -> Callable[[], Awaitable[None]]:
3839
async def _stop() -> None:
39-
await stop_periodic_task(app.state.clusters_cleaning_task, timeout=5)
40+
await retried_cancel_task(app.state.clusters_cleaning_task, timeout=5)
4041

4142
return _stop
4243

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_manager.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
from fastapi import FastAPI
77
from models_library.projects import ProjectID
88
from models_library.users import UserID
9-
from servicelib.background_task import create_periodic_task, stop_periodic_task
9+
from servicelib.async_utils import retried_cancel_task
10+
from servicelib.background_task import create_periodic_task
1011
from servicelib.exception_utils import silence_exceptions
1112
from servicelib.logging_utils import log_context
1213
from servicelib.redis import CouldNotAcquireLockError, exclusive
@@ -164,4 +165,4 @@ async def setup_manager(app: FastAPI) -> None:
164165

165166

166167
async def shutdown_manager(app: FastAPI) -> None:
167-
await stop_periodic_task(app.state.scheduler_manager)
168+
await retried_cancel_task(app.state.scheduler_manager)

0 commit comments

Comments
 (0)