Skip to content

Commit 85d7873

Browse files
Merge branch 'master' into 8506-batch-db-requests-in-map-endpoint
2 parents 5fcf1be + a2ecd48 commit 85d7873

File tree

9 files changed

+120
-39
lines changed

9 files changed

+120
-39
lines changed

packages/pytest-simcore/src/pytest_simcore/helpers/playwright.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ def to_json(self) -> str:
143143

144144

145145
SOCKETIO_MESSAGE_PREFIX: Final[str] = "42"
146+
_WEBSOCKET_MESSAGE_PREFIX: Final[str] = "📡OSPARC-WEBSOCKET: "
146147

147148

148149
@dataclass
@@ -165,20 +166,31 @@ def _configure_websocket_events(self) -> None:
165166
) as ctx:
166167

167168
def on_framesent(payload: str | bytes) -> None:
168-
ctx.logger.debug("⬇️ Frame sent: %s", payload)
169+
ctx.logger.debug(
170+
"%s⬇️ Frame sent: %s", _WEBSOCKET_MESSAGE_PREFIX, payload
171+
)
169172

170173
def on_framereceived(payload: str | bytes) -> None:
171-
ctx.logger.debug("⬆️ Frame received: %s", payload)
174+
ctx.logger.debug(
175+
"%s⬆️ Frame received: %s", _WEBSOCKET_MESSAGE_PREFIX, payload
176+
)
172177

173178
def on_close(_: WebSocket) -> None:
174179
if self.auto_reconnect:
175-
ctx.logger.warning("⚠️ WebSocket closed. Attempting to reconnect...")
180+
ctx.logger.warning(
181+
"%s⚠️ WebSocket closed. Attempting to reconnect...",
182+
_WEBSOCKET_MESSAGE_PREFIX,
183+
)
176184
self._attempt_reconnect(ctx.logger)
177185
else:
178-
ctx.logger.warning("⚠️ WebSocket closed.")
186+
ctx.logger.warning(
187+
"%s⚠️ WebSocket closed.", _WEBSOCKET_MESSAGE_PREFIX
188+
)
179189

180190
def on_socketerror(error_msg: str) -> None:
181-
ctx.logger.error("❌ WebSocket error: %s", error_msg)
191+
ctx.logger.error(
192+
"%s❌ WebSocket error: %s", _WEBSOCKET_MESSAGE_PREFIX, error_msg
193+
)
182194

183195
# Attach core event listeners
184196
self.ws.on("framesent", on_framesent)

packages/pytest-simcore/src/pytest_simcore/helpers/playwright_sim4life.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,9 @@ class WaitForS4LDict(TypedDict):
9898
iframe: FrameLocator
9999

100100

101+
_WEBSOCKET_MESSAGE_S4L_PREFIX: Final[str] = "📡S4L-WEBSOCKET: "
102+
103+
101104
def wait_for_launched_s4l(
102105
page: Page,
103106
node_id,
@@ -137,6 +140,31 @@ def wait_for_launched_s4l(
137140
)
138141
s4l_websocket = ws_info.value
139142
ctx.logger.info("acquired S4L websocket!")
143+
144+
def on_framesent(payload: str | bytes) -> None:
145+
ctx.logger.debug(
146+
"%s⬇️ Frame sent: %s", _WEBSOCKET_MESSAGE_S4L_PREFIX, payload
147+
)
148+
149+
def on_framereceived(payload: str | bytes) -> None:
150+
ctx.logger.debug(
151+
"%s⬆️ Frame received: %s", _WEBSOCKET_MESSAGE_S4L_PREFIX, payload
152+
)
153+
154+
def on_close(_: WebSocket) -> None:
155+
ctx.logger.warning("%s⚠️ WebSocket closed.", _WEBSOCKET_MESSAGE_S4L_PREFIX)
156+
157+
def on_socketerror(error_msg: str) -> None:
158+
ctx.logger.error(
159+
"%s❌ WebSocket error: %s", _WEBSOCKET_MESSAGE_S4L_PREFIX, error_msg
160+
)
161+
162+
# Attach core event listeners
163+
s4l_websocket.on("framesent", on_framesent)
164+
s4l_websocket.on("framereceived", on_framereceived)
165+
s4l_websocket.on("close", on_close)
166+
s4l_websocket.on("socketerror", on_socketerror)
167+
140168
return {
141169
"websocket": s4l_websocket,
142170
"iframe": s4l_iframe,

packages/service-library/src/servicelib/long_running_tasks/models.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,16 @@ class TaskData(BaseModel):
7171
),
7272
] = None
7373

74+
detected_as_done_at: Annotated[
75+
datetime | None,
76+
Field(
77+
description=(
78+
"used to remove the task when it's first detected as done "
79+
"if a task was started as fire_and_forget=True"
80+
)
81+
),
82+
] = None
83+
7484
is_done: Annotated[
7585
bool,
7686
Field(description="True when the task finished running with or without errors"),

packages/service-library/src/servicelib/long_running_tasks/task.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,33 @@ async def _get_tasks_to_remove(
113113

114114
for tracked_task in await tracked_tasks.list_tasks_data():
115115
if tracked_task.fire_and_forget:
116-
continue
116+
# fire and forget tasks also need to be remove from tracking
117+
# when detectes as done, start counting how much time has elapsed
118+
# if over stale_task_detect_timeout_s remove the task
119+
120+
# wait for task to complete
121+
if not tracked_task.is_done:
122+
continue
123+
124+
# mark detected as done
125+
if tracked_task.detected_as_done_at is None:
126+
await tracked_tasks.update_task_data(
127+
tracked_task.task_id,
128+
updates={
129+
"detected_as_done_at": datetime.datetime.now(tz=datetime.UTC)
130+
},
131+
)
132+
continue
133+
134+
# if enough time passes remove the task
135+
elapsed_since_done = (
136+
utc_now - tracked_task.detected_as_done_at
137+
).total_seconds()
138+
if elapsed_since_done > stale_task_detect_timeout_s:
139+
tasks_to_remove.append(
140+
(tracked_task.task_id, tracked_task.task_context)
141+
)
142+
continue
117143

118144
if tracked_task.last_status_check is None:
119145
# the task just added or never received a poll request
@@ -317,6 +343,7 @@ async def _tasks_monitor(self) -> None: # noqa: C901
317343
"""
318344
self._started_event_task_tasks_monitor.set()
319345
task_id: TaskId
346+
320347
for task_id in set(self._created_tasks.keys()):
321348
if task := self._created_tasks.get(task_id, None):
322349
is_done = task.done()

packages/service-library/src/servicelib/rabbitmq/_utils.py

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ async def declare_queue(
7676
if arguments is not None:
7777
default_arguments.update(arguments)
7878
queue_parameters: dict[str, Any] = {
79-
"durable": True,
79+
"durable": not exclusive_queue,
8080
"exclusive": exclusive_queue,
8181
"arguments": default_arguments,
8282
"name": f"{get_rabbitmq_client_unique_name(client_name)}_{queue_name}_exclusive",
@@ -85,19 +85,6 @@ async def declare_queue(
8585
# NOTE: setting a name will ensure multiple instance will take their data here
8686
queue_parameters |= {"name": queue_name}
8787

88-
# avoids deprecated `transient_nonexcl_queues` warning in RabbitMQ
89-
if (
90-
queue_parameters.get("durable", False) is False
91-
and queue_parameters.get("exclusive", False) is False
92-
):
93-
msg = (
94-
"Queue must be `durable` or `exclusive`, but not both. "
95-
"This is to avoid the `transient_nonexcl_queues` warning. "
96-
"NOTE: if both `durable` and `exclusive` are missing they are considered False. "
97-
f"{queue_parameters=}"
98-
)
99-
raise ValueError(msg)
100-
10188
# NOTE: if below line raises something similar to ``ChannelPreconditionFailed: PRECONDITION_FAILED``
10289
# most likely someone changed the signature of the queues (parameters etc...)
10390
# Safest way to deal with it:

packages/service-library/tests/long_running_tasks/test_long_running_tasks_task.py

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ def _get_resutlt(result_field: ResultField) -> Any:
199199
return loads(result_field.str_result)
200200

201201

202-
async def test_fire_and_forget_task_is_not_auto_removed(
202+
async def test_fire_and_forget_task_is_not_auto_removed_while_running(
203203
long_running_manager: LongRunningManager, empty_context: TaskContext
204204
):
205205
task_id = await lrt_api.start_task(
@@ -211,19 +211,32 @@ async def test_fire_and_forget_task_is_not_auto_removed(
211211
fire_and_forget=True,
212212
task_context=empty_context,
213213
)
214+
214215
await asyncio.sleep(3 * TEST_CHECK_STALE_INTERVAL_S)
215216
# the task shall still be present even if we did not check the status before
216217
status = await long_running_manager.tasks_manager.get_task_status(
217218
task_id, with_task_context=empty_context
218219
)
219220
assert not status.done, "task was removed although it is fire and forget"
220-
# the task shall finish
221-
await asyncio.sleep(4 * TEST_CHECK_STALE_INTERVAL_S)
222-
# get the result
223-
task_result = await long_running_manager.tasks_manager.get_task_result(
224-
task_id, with_task_context=empty_context
225-
)
226-
assert _get_resutlt(task_result) == 42
221+
222+
async for attempt in AsyncRetrying(**_RETRY_PARAMS):
223+
with attempt:
224+
try:
225+
await long_running_manager.tasks_manager.get_task_status(
226+
task_id, with_task_context=empty_context
227+
)
228+
raise TryAgain
229+
except TaskNotFoundError:
230+
pass
231+
232+
with pytest.raises(TaskNotFoundError):
233+
await long_running_manager.tasks_manager.get_task_status(
234+
task_id, with_task_context=empty_context
235+
)
236+
with pytest.raises(TaskNotFoundError):
237+
await long_running_manager.tasks_manager.get_task_result(
238+
task_id, with_task_context=empty_context
239+
)
227240

228241

229242
async def test_get_result_of_unfinished_task_raises(

services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from models_library.rabbitmq_messages import InstrumentationRabbitMessage
1414
from models_library.rpc.webserver.auth.api_keys import generate_unique_api_key
1515
from models_library.service_settings_labels import SimcoreServiceLabels
16+
from models_library.services_types import ServiceRunID
1617
from models_library.shared_user_preferences import (
1718
AllowMetricsCollectionFrontendUserPreference,
1819
)
@@ -287,17 +288,19 @@ async def service_remove_sidecar_proxy_docker_networks_and_volumes(
287288
scheduler_data.node_uuid
288289
)
289290

290-
await _cleanup_long_running_tasks(app, scheduler_data.node_uuid)
291+
await _cleanup_long_running_tasks(app, scheduler_data.run_id)
291292

292293
await task_progress.update(
293294
message="finished removing resources", percent=ProgressPercent(1)
294295
)
295296

296297

297-
async def _cleanup_long_running_tasks(app: FastAPI, node_id: NodeID) -> None:
298+
async def _cleanup_long_running_tasks(
299+
app: FastAPI, service_run_id: ServiceRunID
300+
) -> None:
298301
long_running_client_helper = get_long_running_client_helper(app)
299302

300-
sidecar_namespace = f"SIMCORE-SERVICE-DYNAMIC-SIDECAR-{node_id}"
303+
sidecar_namespace = f"SIMCORE-SERVICE-DYNAMIC-SIDECAR-{service_run_id}"
301304
await long_running_client_helper.cleanup(sidecar_namespace)
302305

303306

services/static-webserver/client/source/class/osparc/store/Groups.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,8 @@ qx.Class.define("osparc.store.Groups", {
159159
this.__addMemberToCache(orgMember, groupId);
160160
});
161161
}
162-
});
162+
})
163+
.catch(err => osparc.FlashMessenger.logError(err));
163164
},
164165

165166
fetchGroupsAndMembers: function() {

services/static-webserver/client/source/class/osparc/store/Users.js

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,10 @@ qx.Class.define("osparc.store.Users", {
3636
members: {
3737
__unknowns: null,
3838

39-
__fetchUser: function(groupId) {
39+
__fetchUser: function(userGroupId) {
4040
const params = {
4141
url: {
42-
gid: groupId
42+
gid: userGroupId
4343
}
4444
};
4545
return osparc.data.Resources.fetch("users", "get", params)
@@ -48,22 +48,22 @@ qx.Class.define("osparc.store.Users", {
4848
return user;
4949
})
5050
.catch(() => {
51-
this.__unknowns.push(groupId);
51+
this.__unknowns.push(userGroupId);
5252
return null;
5353
});
5454
},
5555

56-
getUser: async function(groupId, fetchIfNotFound = true) {
57-
if (this.__unknowns.includes(groupId)) {
56+
getUser: async function(userGroupId, fetchIfNotFound = true) {
57+
if (this.__unknowns.includes(userGroupId)) {
5858
return null;
5959
}
60-
const userFound = this.getUsers().find(user => user.getGroupId() === groupId);
60+
const userFound = this.getUsers().find(user => user.getGroupId() === userGroupId);
6161
if (userFound) {
6262
return userFound;
6363
}
6464
if (fetchIfNotFound) {
6565
try {
66-
const user = await this.__fetchUser(groupId);
66+
const user = await this.__fetchUser(userGroupId);
6767
if (user) {
6868
return user;
6969
}

0 commit comments

Comments
 (0)