Skip to content

Commit ed0b00c

Browse files
GitHKAndrei Neagu
andauthored
Enable orphan service removal + making stop service more solid (#1842)
* orpahaned services removal enabled again * decorator makes async function calls run in sequnce * requests to stop_service the same service are now serialized * fixed typo * corrected parameter name * fix incorrect usage: was considering all variables in the scope of the function it will now onluy consider the function's arguments * removed out of date comment * function got a more appropriate name * getting codeclimate unstuck :\ Co-authored-by: Andrei Neagu <[email protected]>
1 parent 3ef931e commit ed0b00c

File tree

5 files changed

+287
-14
lines changed

5 files changed

+287
-14
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,3 +128,4 @@ This project is licensed under the terms of the [MIT license](LICENSE).
128128
[chocolatey]:https://chocolatey.org/
129129
[vscode]:https://code.visualstudio.com/
130130
[WSL]:https://docs.microsoft.com/en-us/windows/wsl/faq
131+
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
import asyncio
2+
from functools import wraps
3+
from collections import deque
4+
import attr
5+
from typing import List, Dict
6+
7+
8+
@attr.s(auto_attribs=True)
9+
class Context:
10+
in_queue: asyncio.Queue
11+
out_queue: asyncio.Queue
12+
initialized: bool
13+
14+
15+
def run_sequentially_in_context(target_args: List[str] = None):
16+
"""All request to function with same calling context will be run sequentially.
17+
18+
Example:
19+
20+
Given the following decorated function
21+
22+
@run_sequentially_in_context(target_args=["param3", "param1"])
23+
async def func(param1, param2, param3):
24+
await asyncio.sleep(1)
25+
26+
The context will be formed by the values of the arguments "param3" and "param1".
27+
The values must be serializable as they will be converted to string
28+
and put together as storage key for the context.
29+
30+
The below calls will all run in a sequence:
31+
32+
functions = [
33+
func(1, "something", 3),
34+
func(1, "else", 3),
35+
func(1, "here", 3),
36+
]
37+
await asyncio.gather(*functions)
38+
39+
The following calls will run in parallel, because they have different contexts:
40+
41+
functions = [
42+
func(1, "something", 3),
43+
func(2, "else", 3),
44+
func(3, "here", 3),
45+
]
46+
await asyncio.gather(*functions)
47+
48+
"""
49+
target_args = [] if target_args is None else target_args
50+
51+
def internal(decorated_function):
52+
contexts = {}
53+
54+
def get_context(args, kwargs: Dict) -> Context:
55+
arg_names = decorated_function.__code__.co_varnames[
56+
: decorated_function.__code__.co_argcount
57+
]
58+
search_args = dict(zip(arg_names, args))
59+
search_args.update(kwargs)
60+
61+
key_parts = deque()
62+
for arg in target_args:
63+
if arg not in search_args:
64+
message = (
65+
f"Expected '{arg}' in '{decorated_function.__name__}'"
66+
f" arguments. Got '{search_args}'"
67+
)
68+
raise ValueError(message)
69+
key_parts.append(search_args[arg])
70+
71+
key = ":".join(map(str, key_parts))
72+
73+
if key not in contexts:
74+
contexts[key] = Context(
75+
in_queue=asyncio.Queue(),
76+
out_queue=asyncio.Queue(),
77+
initialized=False,
78+
)
79+
80+
return contexts[key]
81+
82+
@wraps(decorated_function)
83+
async def wrapper(*args, **kwargs):
84+
context: Context = get_context(args, kwargs)
85+
86+
if not context.initialized:
87+
context.initialized = True
88+
89+
async def worker(in_q: asyncio.Queue, out_q: asyncio.Queue):
90+
while True:
91+
awaitable = await in_q.get()
92+
in_q.task_done()
93+
try:
94+
result = await awaitable
95+
except Exception as e: # pylint: disable=broad-except
96+
result = e
97+
await out_q.put(result)
98+
99+
asyncio.get_event_loop().create_task(
100+
worker(context.in_queue, context.out_queue)
101+
)
102+
103+
await context.in_queue.put(decorated_function(*args, **kwargs))
104+
105+
wrapped_result = await context.out_queue.get()
106+
if isinstance(wrapped_result, Exception):
107+
raise wrapped_result
108+
109+
return wrapped_result
110+
111+
return wrapper
112+
113+
return internal
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
import asyncio
2+
import copy
3+
import random
4+
from collections import deque
5+
from time import time
6+
from typing import Any, Dict, List
7+
8+
import pytest
9+
10+
from servicelib.async_utils import run_sequentially_in_context
11+
12+
13+
class LockedStore:
14+
__slots__ = ("_queue", "_lock")
15+
16+
def __init__(self):
17+
self._queue = deque()
18+
self._lock = asyncio.Lock()
19+
20+
async def push(self, item: Any):
21+
async with self._lock:
22+
self._queue.append(item)
23+
24+
async def get_all(self) -> List[Any]:
25+
async with self._lock:
26+
return list(self._queue)
27+
28+
29+
async def test_context_aware_dispatch() -> None:
30+
@run_sequentially_in_context(target_args=["c1", "c2", "c3"])
31+
async def orderly(c1: Any, c2: Any, c3: Any, control: Any) -> None:
32+
_ = (c1, c2, c3)
33+
sleep_interval = random.uniform(0, 0.01)
34+
await asyncio.sleep(sleep_interval)
35+
36+
context = dict(c1=c1, c2=c2, c3=c3)
37+
await locked_stores[make_key_from_context(context)].push(control)
38+
39+
def make_key_from_context(context: Dict) -> str:
40+
return ".".join([f"{k}:{v}" for k, v in context.items()])
41+
42+
def make_context():
43+
return dict(
44+
c1=random.randint(0, 10), c2=random.randint(0, 10), c3=random.randint(0, 10)
45+
)
46+
47+
contexts = [make_context() for _ in range(10)]
48+
49+
locked_stores = {}
50+
expected_outcomes = {}
51+
for context in contexts:
52+
key = make_key_from_context(context)
53+
locked_stores[key] = LockedStore()
54+
expected_outcomes[key] = deque()
55+
56+
tasks = deque()
57+
for control in range(1000):
58+
context = random.choice(contexts)
59+
key = make_key_from_context(context)
60+
expected_outcomes[key].append(control)
61+
62+
params = copy.deepcopy(context)
63+
params["control"] = control
64+
65+
task = asyncio.get_event_loop().create_task(orderly(**params))
66+
tasks.append(task)
67+
68+
for task in tasks:
69+
await task
70+
71+
for context in contexts:
72+
key = make_key_from_context(context)
73+
assert list(expected_outcomes[key]) == await locked_stores[key].get_all()
74+
75+
76+
async def test_context_aware_function_sometimes_fails() -> None:
77+
class DidFailException(Exception):
78+
pass
79+
80+
@run_sequentially_in_context(target_args=["will_fail"])
81+
async def sometimes_failing(will_fail: bool) -> None:
82+
if will_fail:
83+
raise DidFailException("I was instructed to fail")
84+
return True
85+
86+
for x in range(100):
87+
raise_error = x % 2 == 0
88+
89+
if raise_error:
90+
with pytest.raises(DidFailException):
91+
await sometimes_failing(raise_error)
92+
else:
93+
assert await sometimes_failing(raise_error) is True
94+
95+
96+
async def test_context_aware_wrong_target_args_name() -> None:
97+
expected_param_name = "wrong_parameter"
98+
99+
# pylint: disable=unused-argument
100+
@run_sequentially_in_context(target_args=[expected_param_name])
101+
async def target_function(the_param: Any) -> None:
102+
return None
103+
104+
with pytest.raises(ValueError) as excinfo:
105+
await target_function("something")
106+
107+
message = (
108+
f"Expected '{expected_param_name}' in "
109+
f"'{target_function.__name__}' arguments."
110+
)
111+
assert str(excinfo.value).startswith(message) is True
112+
113+
114+
async def test_context_aware_measure_parallelism() -> None:
115+
# expected duration 1 second
116+
@run_sequentially_in_context(target_args=["control"])
117+
async def sleep_for(sleep_interval: float, control: Any) -> Any:
118+
await asyncio.sleep(sleep_interval)
119+
return control
120+
121+
control_sequence = list(range(1000))
122+
sleep_duration = 0.5
123+
functions = [sleep_for(sleep_duration, x) for x in control_sequence]
124+
125+
start = time()
126+
result = await asyncio.gather(*functions)
127+
elapsed = time() - start
128+
129+
assert elapsed < sleep_duration * 2 # allow for some internal delay
130+
assert control_sequence == result
131+
132+
133+
async def test_context_aware_measure_serialization() -> None:
134+
# expected duration 1 second
135+
@run_sequentially_in_context(target_args=["control"])
136+
async def sleep_for(sleep_interval: float, control: Any) -> Any:
137+
await asyncio.sleep(sleep_interval)
138+
return control
139+
140+
control_sequence = [1 for _ in range(10)]
141+
sleep_duration = 0.1
142+
functions = [sleep_for(sleep_duration, x) for x in control_sequence]
143+
144+
start = time()
145+
result = await asyncio.gather(*functions)
146+
elapsed = time() - start
147+
148+
minimum_timelapse = (sleep_duration) * len(control_sequence)
149+
assert elapsed > minimum_timelapse
150+
assert control_sequence == result

services/director/src/simcore_service_director/producer.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import aiodocker
1212
import tenacity
1313
from aiohttp import ClientConnectionError, ClientSession, web
14+
from servicelib.async_utils import run_sequentially_in_context
1415

1516
from servicelib.monitor_services import service_started, service_stopped
1617

@@ -914,6 +915,7 @@ async def get_service_details(app: web.Application, node_uuid: str) -> Dict:
914915
) from err
915916

916917

918+
@run_sequentially_in_context(target_args=["node_uuid"])
917919
async def stop_service(app: web.Application, node_uuid: str) -> None:
918920
log.debug("stopping service with uuid %s", node_uuid)
919921
# get the docker client

services/web/server/src/simcore_service_webserver/resource_manager/garbage_collector.py

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,12 @@ async def garbage_collector_task(app: web.Application):
8282

8383
async def collect_garbage(registry: RedisResourceRegistry, app: web.Application):
8484
"""
85-
Garbage collection has the task of removing trash from the system. The trash
85+
Garbage collection has the task of removing trash from the system. The trash
8686
can be divided in:
87-
87+
8888
- Websockets & Redis (used to keep track of current active connections)
8989
- GUEST users (used for temporary access to the system which are created on the fly)
90-
- deletion of users. If a user needs to be deleted it is manually marked as GUEST
90+
- deletion of users. If a user needs to be deleted it is manually marked as GUEST
9191
in the database
9292
9393
The resources are Redis entries where all information regarding all the
@@ -99,7 +99,7 @@ async def collect_garbage(registry: RedisResourceRegistry, app: web.Application)
9999
endpoint to refresh the TTL, thus declaring that the user (websocket connection) is
100100
still active. The `resource_deletion_timeout_seconds` is theTTL of the key.
101101
102-
The field `garbage_collection_interval_seconds` defines the interval at which this
102+
The field `garbage_collection_interval_seconds` defines the interval at which this
103103
function will be called.
104104
"""
105105
logger.info("collecting garbage...")
@@ -122,7 +122,7 @@ async def collect_garbage(registry: RedisResourceRegistry, app: web.Application)
122122
# Temporary disabling GC to until the dynamic service
123123
# safe function is invoked by the GC. This will avoid
124124
# data loss for current users.
125-
# await remove_orphaned_services(registry, app)
125+
await remove_orphaned_services(registry, app)
126126

127127

128128
async def remove_disconnected_user_resources(
@@ -189,7 +189,8 @@ async def remove_disconnected_user_resources(
189189
# if this user was a GUEST also remove it from the database
190190
# with the only associated project owned
191191
await remove_guest_user_with_all_its_resources(
192-
app=app, user_id=int(dead_key["user_id"]),
192+
app=app,
193+
user_id=int(dead_key["user_id"]),
193194
)
194195

195196

@@ -218,7 +219,8 @@ async def remove_users_manually_marked_as_guests(
218219
continue
219220

220221
await remove_guest_user_with_all_its_resources(
221-
app=app, user_id=guest_user_id,
222+
app=app,
223+
user_id=guest_user_id,
222224
)
223225

224226

@@ -283,11 +285,11 @@ async def remove_guest_user_with_all_its_resources(
283285
async def remove_all_projects_for_user(app: web.Application, user_id: int) -> None:
284286
"""
285287
Goes through all the projects and will try to remove them but first it will check if
286-
the project is shared with others.
288+
the project is shared with others.
287289
Based on the given access rights it will deltermine the action to take:
288290
- if other users have read access & execute access it will get deleted
289291
- if other users have write access the project's owner will be changed to a new owner:
290-
- if the project is directly shared with a one or more users, one of these
292+
- if the project is directly shared with a one or more users, one of these
291293
will be picked as the new owner
292294
- if the project is not shared with any user but with groups of users, one
293295
of the users inside the group (which currently exists) will be picked as
@@ -309,12 +311,15 @@ async def remove_all_projects_for_user(app: web.Application, user_id: int) -> No
309311
APP_PROJECT_DBAPI
310312
].list_all_projects_by_uuid_for_user(user_id=user_id)
311313
logger.info(
312-
"Project uuids, to clean, for user '%s': '%s'", user_id, user_project_uuids,
314+
"Project uuids, to clean, for user '%s': '%s'",
315+
user_id,
316+
user_project_uuids,
313317
)
314318

315319
for project_uuid in user_project_uuids:
316320
logger.debug(
317-
"Removing or transfering project '%s'", project_uuid,
321+
"Removing or transfering project '%s'",
322+
project_uuid,
318323
)
319324
try:
320325
project: Dict = await get_project_for_user(
@@ -411,7 +416,9 @@ async def get_new_project_owner_gid(
411416
# fallback to the groups search if the user does not exist
412417
if len(standard_groups) > 0 and new_project_owner_gid is None:
413418
new_project_owner_gid = await fetch_new_project_owner_from_groups(
414-
app=app, standard_groups=standard_groups, user_id=user_id,
419+
app=app,
420+
standard_groups=standard_groups,
421+
user_id=user_id,
415422
)
416423

417424
logger.info(
@@ -483,7 +490,8 @@ async def replace_current_owner(
483490
# syncing back project data
484491
try:
485492
await app[APP_PROJECT_DBAPI].update_project_without_enforcing_checks(
486-
project_data=project, project_uuid=project_uuid,
493+
project_data=project,
494+
project_uuid=project_uuid,
487495
)
488496
except Exception: # pylint: disable=broad-except
489497
logger.exception(
@@ -500,4 +508,3 @@ async def remove_user(app: web.Application, user_id: int) -> None:
500508
logger.warning(
501509
"User '%s' still has some projects, could not be deleted", user_id
502510
)
503-

0 commit comments

Comments
 (0)