Skip to content

Commit 351a803

Browse files
committed
handle WebSocketClosedError in publish and on_message methods; discard disconnected clients
1 parent c74e044 commit 351a803

File tree

2 files changed

+59
-9
lines changed

2 files changed

+59
-9
lines changed

biothings/hub/api/handlers/ws.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import logging
44

55
import sockjs.tornado
6+
from tornado.websocket import WebSocketClosedError
67

78
from biothings.utils.hub_db import ChangeListener
89

@@ -34,7 +35,12 @@ def __init__(self, session, listeners):
3435
super(WebSocketConnection, self).__init__(session)
3536

3637
def publish(self, message):
37-
self.broadcast(self.__class__.clients, message)
38+
try:
39+
self.broadcast(self.__class__.clients, message)
40+
except WebSocketClosedError:
41+
# Client disconnected; remove it from the set so future
42+
# broadcasts no longer target a dead connection.
43+
self.__class__.clients.discard(self)
3844

3945
def on_open(self, info):
4046
# Send that someone joined
@@ -62,12 +68,15 @@ def on_message(self, message):
6268
err.add_note(err_note)
6369
err_to_raise = err
6470
if err_to_raise:
65-
self.send({"error": err_note})
71+
try:
72+
self.send({"error": err_note})
73+
except WebSocketClosedError:
74+
self.__class__.clients.discard(self)
6675
raise err_to_raise
6776

6877
def on_close(self):
6978
# Remove client from the clients list and broadcast leave message
70-
self.__class__.clients.remove(self)
79+
self.__class__.clients.discard(self)
7180
self.broadcast(self.__class__.clients, "Someone left.")
7281

7382

biothings/utils/hub_db.py

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -282,20 +282,61 @@ class ChangeWatcher(object):
282282
"hub_config": "config",
283283
}
284284

285+
# Maximum number of events to coalesce per publish cycle. When the hub is
286+
# under heavy load (large builds, many workers) the event queue can fill
287+
# much faster than individual broadcasts can be sent, starving the Tornado
288+
# IOLoop and preventing HTTP responses and WebSocket heartbeats from being
289+
# processed. Draining in batches and deduplicating keeps the loop healthy.
290+
PUBLISH_BATCH_SIZE = 250
291+
285292
@classmethod
286293
def publish(cls):
287294
cls.do_publish = True
288295

289296
async def do():
290297
while cls.do_publish:
298+
# Block until at least one event is available.
291299
evt = await cls.event_queue.get()
292-
for listener in cls.listeners:
300+
batch = [evt]
301+
302+
# Drain any additional queued events up to the batch limit so
303+
# we can combine duplicate events and reduce broadcasts.
304+
while len(batch) < cls.PUBLISH_BATCH_SIZE:
293305
try:
294-
listener.read(evt)
295-
except Exception as e:
296-
# pass
297-
# TODO: the log line below was commented out, uncomment it to see it causes any issue
298-
logging.error("Can't publish %s to %s: %s", evt, listener, e)
306+
batch.append(cls.event_queue.get_nowait())
307+
except asyncio.QueueEmpty:
308+
break
309+
310+
# Deduplicate: keep only the *latest* event per (obj, _id) pair.
311+
# During a build the same source/build document is updated many
312+
# times in rapid succession — only the final state matters for
313+
# the UI. Log events (no "_id") are always forwarded.
314+
seen = {}
315+
unique_events = []
316+
for event in batch:
317+
obj = event.get("obj")
318+
_id = event.get("_id")
319+
if obj and _id:
320+
key = (obj, _id)
321+
if key in seen:
322+
# Replace the earlier event with this newer one.
323+
unique_events[seen[key]] = event
324+
else:
325+
seen[key] = len(unique_events)
326+
unique_events.append(event)
327+
else:
328+
unique_events.append(event)
329+
330+
for event in unique_events:
331+
for listener in cls.listeners:
332+
try:
333+
listener.read(event)
334+
except Exception as e:
335+
logging.error("Can't publish %s to %s: %s", event, listener, e)
336+
337+
# Yield control back to the IOLoop so HTTP handlers and
338+
# WebSocket heartbeats can be processed between batches.
339+
await asyncio.sleep(0)
299340

300341
return asyncio.ensure_future(do())
301342

0 commit comments

Comments
 (0)