Skip to content

Commit 696938b

Browse files
sandereggmatusdrobuliak66
authored andcommitted
🐛Sticky connection: Ensure emitted socketio messages for logs, progress, status updates and payments are not lost (#7967)
1 parent 474fdfd commit 696938b

File tree

6 files changed

+11
-13
lines changed

6 files changed

+11
-13
lines changed

services/docker-compose.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -888,10 +888,14 @@ services:
888888
- traefik.http.routers.${SWARM_STACK_NAME}_webserver_sticky.rule=Path(`/v0/projects`) ||
889889
Path(`/v0/projects:clone`) ||
890890
PathRegexp(`^/v0/projects/[0-9a-fA-F-]+/nodes/[0-9a-fA-F-]+:stop`) ||
891+
PathRegexp(`^/v0/projects/[0-9a-fA-F-]+/nodes/[0-9a-fA-F-]+:open`) ||
892+
PathRegexp(`^/v0/projects/[0-9a-fA-F-]+/nodes/[0-9a-fA-F-]+:close`) ||
891893
PathRegexp(`^/v0/storage/locations/[0-9]+/paths/.+:size`) ||
892894
PathRegexp(`^/v0/storage/locations/[0-9]+/-/paths:batchDelete`) ||
893895
PathRegexp(`^/v0/storage/locations/[0-9]+/export-data`) ||
894896
PathRegexp(`^/v0/tasks-legacy/.+`)
897+
# NOTE: the sticky router must have a higher priority than the webserver router but below dy-proxies
898+
- traefik.http.routers.${SWARM_STACK_NAME}_webserver_sticky.priority=8
895899
- traefik.http.routers.${SWARM_STACK_NAME}_webserver_sticky.entrypoints=http
896900
- traefik.http.routers.${SWARM_STACK_NAME}_webserver_sticky.service=${SWARM_STACK_NAME}_webserver_sticky
897901
- traefik.http.routers.${SWARM_STACK_NAME}_webserver_sticky.middlewares=${SWARM_STACK_NAME}_gzip@swarm, ${SWARM_STACK_NAME_NO_HYPHEN}_sslheader@swarm, ${SWARM_STACK_NAME}_webserver_retry

services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ async def _progress_message_parser(app: web.Application, data: bytes) -> bool:
9393
app,
9494
rabbit_message.user_id,
9595
message=message,
96-
ignore_queue=True,
9796
)
9897
return True
9998

@@ -107,7 +106,6 @@ async def _log_message_parser(app: web.Application, data: bytes) -> bool:
107106
event_type=SOCKET_IO_LOG_EVENT,
108107
data=rabbit_message.model_dump(exclude={"user_id", "channel_name"}),
109108
),
110-
ignore_queue=True,
111109
)
112110
return True
113111

@@ -124,7 +122,6 @@ async def _events_message_parser(app: web.Application, data: bytes) -> bool:
124122
"node_id": f"{rabbit_message.node_id}",
125123
},
126124
),
127-
ignore_queue=True,
128125
)
129126
return True
130127

@@ -178,9 +175,10 @@ async def _osparc_credits_message_parser(app: web.Application, data: bytes) -> b
178175

179176

180177
async def _unsubscribe_from_rabbitmq(app) -> None:
181-
with log_context(
182-
_logger, logging.INFO, msg="Unsubscribing from rabbitmq channels"
183-
), log_catch(_logger, reraise=False):
178+
with (
179+
log_context(_logger, logging.INFO, msg="Unsubscribing from rabbitmq channels"),
180+
log_catch(_logger, reraise=False),
181+
):
184182
rabbit_client: RabbitMQClient = get_rabbitmq_client(app)
185183
await logged_gather(
186184
*(

services/web/server/src/simcore_service_webserver/payments/_socketio.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ async def notify_payment_completed(
2929
event_type=SOCKET_IO_PAYMENT_COMPLETED_EVENT,
3030
data=jsonable_encoder(payment, by_alias=True),
3131
),
32-
ignore_queue=True,
3332
)
3433

3534

@@ -46,5 +45,4 @@ async def notify_payment_method_acked(
4645
event_type=SOCKET_IO_PAYMENT_METHOD_ACKED_EVENT,
4746
data=jsonable_encoder(payment_method_transaction, by_alias=True),
4847
),
49-
ignore_queue=True,
5048
)

services/web/server/src/simcore_service_webserver/projects/_projects_service.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1961,7 +1961,6 @@ async def notify_project_state_update(
19611961
app,
19621962
user_id=notify_only_user,
19631963
message=message,
1964-
ignore_queue=True,
19651964
)
19661965
else:
19671966
rooms_to_notify: Generator[GroupID, None, None] = (

services/web/server/src/simcore_service_webserver/socketio/messages.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,12 @@ async def send_message_to_user(
6060
user_id: UserID,
6161
message: SocketMessageDict,
6262
*,
63-
ignore_queue: bool,
63+
ignore_queue: bool = False,
6464
) -> None:
6565
"""
6666
Keyword Arguments:
67-
ignore_queue -- set to False when this message is delivered from a server that has no direct connection to the client (default: {True})
68-
An example where this is value is False, is sending messages to a user in the GC
67+
ignore_queue -- set to True when this message is delivered from a server that has no direct connection to the user client (default: {False})
68+
Be careful with this option, as it can lead to message loss if the user is not connected to this server!!
6969
"""
7070
sio: AsyncServer = get_socket_server(app)
7171

services/web/server/tests/integration/02/notifications/test_rabbitmq_consumers.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,6 @@ async def test_log_workflow_only_receives_messages_if_subscribed(
290290
"event_type": SOCKET_IO_LOG_EVENT,
291291
"data": log_message.model_dump(exclude={"user_id", "channel_name"}),
292292
},
293-
ignore_queue=True,
294293
),
295294
)
296295
mocked_send_messages.reset_mock()

0 commit comments

Comments
 (0)