Skip to content

Commit ac8ee1e

Browse files
Copilotlstein
andcommitted
Fix: Use user-specific rooms instead of socket ID tracking for event isolation
Co-authored-by: lstein <111189+lstein@users.noreply.github.com>
1 parent dcd114f commit ac8ee1e

File tree

1 file changed

+31
-40
lines changed

1 file changed

+31
-40
lines changed

invokeai/app/api/sockets.py

Lines changed: 31 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ async def _handle_disconnect(self, sid: str) -> None:
159159
logger.debug(f"Socket {sid} disconnected and cleaned up")
160160

161161
async def _handle_sub_queue(self, sid: str, data: Any) -> None:
162-
"""Handle queue subscription and ensure user info is available for this socket."""
162+
"""Handle queue subscription and add socket to both queue and user-specific rooms."""
163163
queue_id = QueueSubscriptionEvent(**data).queue_id
164164

165165
# Check if we have user info for this socket
@@ -171,9 +171,21 @@ async def _handle_sub_queue(self, sid: str, data: Any) -> None:
171171
"is_admin": False,
172172
}
173173

174-
await self._sio.enter_room(sid, queue_id)
175174
user_id = self._socket_users[sid]['user_id']
176-
logger.info(f"Socket {sid} (user_id: {user_id}) subscribed to queue {queue_id}")
175+
is_admin = self._socket_users[sid]['is_admin']
176+
177+
# Add socket to the queue room
178+
await self._sio.enter_room(sid, queue_id)
179+
180+
# Also add socket to a user-specific room for event filtering
181+
user_room = f"user:{user_id}"
182+
await self._sio.enter_room(sid, user_room)
183+
184+
# If admin, also add to admin room to receive all events
185+
if is_admin:
186+
await self._sio.enter_room(sid, "admin")
187+
188+
logger.info(f"Socket {sid} (user_id: {user_id}, is_admin: {is_admin}) subscribed to queue {queue_id} and user room {user_room}")
177189

178190
async def _handle_unsub_queue(self, sid: str, data: Any) -> None:
179191
await self._sio.leave_room(sid, QueueSubscriptionEvent(**data).queue_id)
@@ -195,45 +207,24 @@ async def _handle_queue_event(self, event: FastAPIEvent[QueueEventBase]):
195207

196208
# Check if this is a queue item event that should be filtered by user
197209
if isinstance(event_data, QueueItemEventBase) and hasattr(event_data, "user_id"):
198-
# Get all socket IDs in the queue room (this is a synchronous generator, not awaitable)
199-
room_name = event_data.queue_id
200-
room_participants = self._sio.manager.get_participants("/", room_name)
210+
# Emit to user-specific room and admin room
211+
user_room = f"user:{event_data.user_id}"
201212

202-
# Filter sids based on user_id or admin status
203-
emitted_count = 0
204-
participants_checked = 0
205-
for participant in room_participants:
206-
participants_checked += 1
207-
# get_participants returns tuples of (namespace, sid) - extract the sid
208-
sid = participant[1] if isinstance(participant, tuple) else participant
209-
try:
210-
# Get user info from internal dict instead of socket session
211-
user_info = self._socket_users.get(sid)
212-
if user_info:
213-
session_user_id = user_info.get("user_id", "system")
214-
is_admin = user_info.get("is_admin", False)
215-
216-
logger.debug(f"Checking socket {sid}: session_user_id={session_user_id}, event_user_id={event_data.user_id}, is_admin={is_admin}")
217-
218-
# Emit to the owner or to admins
219-
if session_user_id == event_data.user_id or is_admin:
220-
await self._sio.emit(
221-
event=event_name,
222-
data=event_data.model_dump(mode="json"),
223-
room=sid, # Emit to specific socket
224-
)
225-
emitted_count += 1
226-
logger.debug(f"Emitted {event_name} to socket {sid}")
227-
else:
228-
logger.warning(f"No user info found for socket {sid} in room {room_name}")
229-
except Exception as e:
230-
# Log and continue if we can't emit to this specific socket
231-
# (e.g., socket disconnected, session expired)
232-
logger.warning(f"Failed to emit event {event_name} to socket {sid}: {e}")
213+
# Emit to the user's room
214+
await self._sio.emit(
215+
event=event_name,
216+
data=event_data.model_dump(mode="json"),
217+
room=user_room
218+
)
219+
220+
# Also emit to admin room so admins can see all events
221+
await self._sio.emit(
222+
event=event_name,
223+
data=event_data.model_dump(mode="json"),
224+
room="admin"
225+
)
233226

234-
if emitted_count == 0:
235-
logger.warning(f"Event {event_name} was not emitted to any sockets (user_id: {event_data.user_id}, room: {room_name}, participants_checked: {participants_checked}, total_socket_users: {len(self._socket_users)})")
236-
logger.debug(f"Available socket users: {list(self._socket_users.keys())}")
227+
logger.debug(f"Emitted {event_name} to user room {user_room} and admin room")
237228
else:
238229
# For non-item events (like queue status), emit to all subscribers
239230
await self._sio.emit(

0 commit comments

Comments
 (0)