Skip to content

Commit 4f1cd29

Browse files
authored
Websocket fix (#592)
* Encased 'WebSocket.send()' and 'WebSocket.close()' in try-except blocks * Added regular pinging when executing 'WebSocketApp.run_forever()'
1 parent 59297f9 commit 4f1cd29

File tree

3 files changed

+45
-28
lines changed

3 files changed

+45
-28
lines changed

src/murfey/client/__init__.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@
1111
import webbrowser
1212
from datetime import datetime
1313
from pathlib import Path
14+
from pprint import pprint
1415
from queue import Queue
15-
from typing import List, Literal
16+
from typing import Literal
1617
from urllib.parse import ParseResult, urlparse
1718

1819
import requests
@@ -40,7 +41,7 @@ def write_config(config: configparser.ConfigParser):
4041

4142

4243
def main_loop(
43-
source_watchers: List[murfey.client.watchdir.DirWatcher],
44+
source_watchers: list[murfey.client.watchdir.DirWatcher],
4445
appearance_time: float,
4546
transfer_all: bool,
4647
):
@@ -91,6 +92,7 @@ def _check_for_updates(
9192

9293

9394
def run():
95+
# Load client config and server information
9496
config = read_config()
9597
instrument_name = config["Murfey"]["instrument_name"]
9698
try:
@@ -109,6 +111,7 @@ def run():
109111
else:
110112
known_server = config["Murfey"].get("server")
111113

114+
# Set up argument parser with dynamic defaults based on client config
112115
parser = argparse.ArgumentParser(description="Start the Murfey client")
113116
parser.add_argument(
114117
"--server",
@@ -194,23 +197,23 @@ def run():
194197
default=False,
195198
help="Do not trigger processing for any data directories currently on disk (you may have started processing for them in a previous murfey run)",
196199
)
197-
198200
args = parser.parse_args()
199201

202+
# Logic to exit early based on parsed args
200203
if not args.server:
201204
exit("Murfey server not set. Please run with --server host:port")
202205
if not args.server.startswith(("http://", "https://")):
203206
if "://" in args.server:
204207
exit("Unknown server protocol. Only http:// and https:// are allowed")
205208
args.server = f"http://{args.server}"
206-
207209
if args.remove_files:
208210
remove_prompt = Confirm.ask(
209211
f"Are you sure you want to remove files from {args.source or Path('.').absolute()}?"
210212
)
211213
if not remove_prompt:
212214
exit("Exiting")
213215

216+
# If a new server URL is provided, save info to config file
214217
murfey_url = urlparse(args.server, allow_fragments=False)
215218
if args.server != known_server:
216219
# New server specified. Verify that it is real
@@ -232,8 +235,7 @@ def run():
232235
if args.no_transfer:
233236
log.info("No files will be transferred as --no-transfer flag was specified")
234237

235-
from pprint import pprint
236-
238+
# Check ISPyB (if set up) for ongoing visits
237239
ongoing_visits = []
238240
if args.visit:
239241
ongoing_visits = [args.visit]
@@ -250,35 +252,38 @@ def run():
250252

251253
_enable_webbrowser_in_cygwin()
252254

255+
# Set up additional log handlers
253256
log.setLevel(logging.DEBUG)
254257
log_queue = Queue()
255258
input_queue = Queue()
256259

257-
# rich_handler = DirectableRichHandler(log_queue, enable_link_path=False)
260+
# Rich-based console handler
258261
rich_handler = DirectableRichHandler(enable_link_path=False)
259262
rich_handler.setLevel(logging.DEBUG if args.debug else logging.INFO)
260263

264+
# Set up websocket app and handler
261265
client_id = requests.get(f"{murfey_url.geturl()}/new_client_id/").json()
262266
ws = murfey.client.websocket.WSApp(
263267
server=args.server,
264268
id=client_id["new_id"],
265269
)
270+
ws_handler = CustomHandler(ws.send)
266271

272+
# Add additional handlers and set logging levels
267273
logging.getLogger().addHandler(rich_handler)
268-
handler = CustomHandler(ws.send)
269-
logging.getLogger().addHandler(handler)
274+
logging.getLogger().addHandler(ws_handler)
270275
logging.getLogger("murfey").setLevel(logging.INFO)
271276
logging.getLogger("websocket").setLevel(logging.WARNING)
272277

273278
log.info("Starting Websocket connection")
274279

275-
status_bar = StatusBar()
276-
280+
# Load machine data for subsequent sections
277281
machine_data = requests.get(
278282
f"{murfey_url.geturl()}/instruments/{instrument_name}/machine"
279283
).json()
280284
gain_ref: Path | None = None
281285

286+
# Set up Murfey environment instance and map it to websocket app
282287
instance_environment = MurfeyInstanceEnvironment(
283288
url=murfey_url,
284289
client_id=ws.id,
@@ -295,9 +300,10 @@ def run():
295300
else ""
296301
),
297302
)
298-
299303
ws.environment = instance_environment
300304

305+
# Set up and run Murfey TUI app
306+
status_bar = StatusBar()
301307
rich_handler.redirect = True
302308
app = MurfeyTUI(
303309
environment=instance_environment,

src/murfey/client/websocket.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ def _run_websocket_event_loop(self):
8787
backoff = 0
8888
while True:
8989
attempt_start = time.perf_counter()
90-
connection_failure = self._ws.run_forever()
90+
connection_failure = self._ws.run_forever(ping_interval=30, ping_timeout=10)
9191
if not connection_failure:
9292
break
9393
if (time.perf_counter() - attempt_start) < 5:
@@ -108,7 +108,10 @@ def _send_queue_feeder(self):
108108
continue
109109
while not self._ready:
110110
time.sleep(0.3)
111-
self._ws.send(element)
111+
try:
112+
self._ws.send(element)
113+
except Exception:
114+
log.error("Error sending message through websocket", exc_info=True)
112115
self._send_queue.task_done()
113116
log.debug("Websocket send-queue-feeder thread stopped")
114117

@@ -135,7 +138,10 @@ def close(self):
135138
self._send_queue.put(None)
136139
self._feeder_thread.join()
137140
self._receiver_thread.join()
138-
self._ws.close()
141+
try:
142+
self._ws.close()
143+
except Exception:
144+
log.error("Error closing websocket connection", exc_info=True)
139145

140146
def on_message(self, ws: websocket.WebSocketApp, message: str):
141147
self._receive_queue.put(message)

src/murfey/server/websocket.py

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44
import json
55
import logging
66
from datetime import datetime
7-
from typing import Any, Dict, TypeVar, Union
7+
from typing import Any, TypeVar, Union
88

99
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
10-
from sqlmodel import select
10+
from sqlmodel import Session, select
1111

1212
import murfey.server.prometheus as prom
1313
from murfey.server.murfey_db import get_murfey_db_session
@@ -22,10 +22,13 @@
2222

2323
class ConnectionManager:
2424
def __init__(self):
25-
self.active_connections: Dict[int | str, WebSocket] = {}
25+
self.active_connections: dict[int | str, WebSocket] = {}
2626

2727
async def connect(
28-
self, websocket: WebSocket, client_id: int | str, register_client: bool = True
28+
self,
29+
websocket: WebSocket,
30+
client_id: Union[int, str],
31+
register_client: bool = True,
2932
):
3033
await websocket.accept()
3134
self.active_connections[client_id] = websocket
@@ -38,16 +41,17 @@ async def connect(
3841

3942
@staticmethod
4043
def _register_new_client(client_id: int):
44+
log.debug(f"Registering new client with ID {client_id}")
4145
new_client = ClientEnvironment(client_id=client_id, connected=True)
42-
murfey_db = next(get_murfey_db_session())
46+
murfey_db: Session = next(get_murfey_db_session())
4347
murfey_db.add(new_client)
4448
murfey_db.commit()
4549
murfey_db.close()
4650

47-
def disconnect(self, client_id: int | str, unregister_client: bool = True):
51+
def disconnect(self, client_id: Union[int, str], unregister_client: bool = True):
4852
self.active_connections.pop(client_id)
4953
if unregister_client:
50-
murfey_db = next(get_murfey_db_session())
54+
murfey_db: Session = next(get_murfey_db_session())
5155
client_env = murfey_db.exec(
5256
select(ClientEnvironment).where(
5357
ClientEnvironment.client_id == client_id
@@ -73,7 +77,7 @@ async def websocket_endpoint(websocket: WebSocket, client_id: int):
7377
while True:
7478
data = await websocket.receive_text()
7579
try:
76-
json_data = json.loads(data)
80+
json_data: dict = json.loads(data)
7781
if json_data["type"] == "log": # and isinstance(json_data, dict)
7882
json_data.pop("type")
7983
await forward_log(json_data, websocket)
@@ -92,15 +96,16 @@ async def websocket_endpoint(websocket: WebSocket, client_id: int):
9296

9397
@ws.websocket("/connect/{client_id}")
9498
async def websocket_connection_endpoint(
95-
websocket: WebSocket, client_id: Union[int, str]
99+
websocket: WebSocket,
100+
client_id: Union[int, str],
96101
):
97102
await manager.connect(websocket, client_id, register_client=False)
98103
await manager.broadcast(f"Client {client_id} joined")
99104
try:
100105
while True:
101106
data = await websocket.receive_text()
102107
try:
103-
json_data = json.loads(data)
108+
json_data: dict = json.loads(data)
104109
if json_data.get("type") == "log": # and isinstance(json_data, dict)
105110
json_data.pop("type")
106111
await forward_log(json_data, websocket)
@@ -115,12 +120,12 @@ async def websocket_connection_endpoint(
115120
await manager.broadcast(f"Client #{client_id} disconnected")
116121

117122

118-
async def check_connections(active_connections):
123+
async def check_connections(active_connections: list[WebSocket]):
119124
log.info("Checking connections")
120125
for connection in active_connections:
121126
log.info("Checking response")
122127
try:
123-
await asyncio.wait_for(connection.receive(), timeout=6)
128+
await asyncio.wait_for(connection.receive(), timeout=10)
124129
except asyncio.TimeoutError:
125130
log.info(f"Disconnecting Client {connection[0]}")
126131
manager.disconnect(connection[0], connection[1])
@@ -139,7 +144,7 @@ async def forward_log(logrecord: dict[str, Any], websocket: WebSocket):
139144

140145
@ws.delete("/test/{client_id}")
141146
async def close_ws_connection(client_id: int):
142-
murfey_db = next(get_murfey_db_session())
147+
murfey_db: Session = next(get_murfey_db_session())
143148
client_env = murfey_db.exec(
144149
select(ClientEnvironment).where(ClientEnvironment.client_id == client_id)
145150
).one()

0 commit comments

Comments
 (0)