11import logging
22import socket
3- from queue import Queue
4- from queue import ShutDown
53from typing import cast
64
75from PyQt6 .QtCore import QByteArray
2119logger = logging .getLogger (__name__ )
2220
2321
24- class SocketReader (QThread ):
22+ class SocketReader (QObject ):
2523 message_received = pyqtSignal (QByteArray )
26- error = pyqtSignal ()
24+ error = pyqtSignal (object )
2725
28- def __init__ (self , connection : ClientConnection ) -> None :
26+ def __init__ (self , connection : ClientConnection | None = None ) -> None :
2927 super ().__init__ ()
30- self .connection = connection
28+ self .connection : ClientConnection | None = connection
3129
32- def run (self ) -> None :
30+ def set_connection (self , conn : ClientConnection , / ) -> None :
31+ self .connection = conn
32+
33+ def read (self ) -> None :
34+ if self .connection is None :
35+ logger .warning ("Trying to read without any connection" )
36+ return
3337 try :
3438 for message in self .connection :
3539 self .message_received .emit (QByteArray (cast (bytes , message )))
3640 except (ConnectionAbortedError , ConnectionClosedError ) as e :
37- logger .error (e )
38- self .error .emit ()
39- return
40-
41-
42- class SocketWriter (QThread ):
43- error = pyqtSignal ()
44-
45- def __init__ (self , connection : ClientConnection , send_queue : Queue [bytes ]) -> None :
46- super ().__init__ ()
47- self .send_queue = send_queue
48- self .connection = connection
49-
50- def run (self ) -> None :
51- while True :
52- try :
53- item = self .send_queue .get ()
54- except ShutDown :
55- return
56- try :
57- self .connection .send (item )
58- except (ConnectionAbortedError , ConnectionClosedError ) as e :
59- logger .error (e )
60- self .error .emit ()
61- return
62- self .send_queue .task_done ()
41+ self .error .emit (e )
6342
6443
6544class Websocket (QObject ):
6645 binaryMessageReceived = pyqtSignal (QByteArray )
6746 errorOccurred = pyqtSignal (QAbstractSocket .SocketError )
6847 stateChanged = pyqtSignal (QAbstractSocket .SocketState )
48+ _start_read = pyqtSignal ()
6949
7050 def __init__ (self , addresses : list [QHostAddress ]) -> None :
7151 super ().__init__ ()
7252 self .addresses = addresses
7353 self .socket : socket .socket | None = None
74- self .send_queue : Queue [bytes ] | None = None
7554 self ._sock_state = QAbstractSocket .SocketState .UnconnectedState
7655
77- self .reader_thread : SocketReader | None = None
78- self .writer_thread : SocketWriter | None = None
56+ self .reader_thread = QThread ()
57+
58+ self .reader = SocketReader ()
59+ self .reader .moveToThread (self .reader_thread )
60+ self .reader .message_received .connect (self .binaryMessageReceived .emit )
61+ self .reader .error .connect (self .handle_error )
62+ self ._start_read .connect (self .reader .read )
63+
7964 self .connection : ClientConnection | None = None
8065
8166 self ._states = (
@@ -106,13 +91,14 @@ def sock_state(self, value: State) -> None:
10691 self ._sock_state = self ._states [value ]
10792 self .stateChanged .emit (self ._sock_state )
10893
109- def sync_state (self ) -> None :
110- assert self .connection is not None
111- self .sock_state = self .connection .state
112-
11394 def sendBinaryMessage (self , message : bytes ) -> None :
114- assert self .send_queue is not None
115- self .send_queue .put (message )
95+ if self .connection is not None :
96+ try :
97+ self .connection .send (message )
98+ except (ConnectionAbortedError , ConnectionClosedError ) as e :
99+ self .handle_error (e )
100+ else :
101+ logger .warning ("Trying to write without any connection" )
116102
117103 def state (self ) -> QAbstractSocket .SocketState :
118104 return self ._sock_state
@@ -121,30 +107,19 @@ def errorString(self) -> str:
121107 return "[Not implemented]"
122108
123109 def close (self ) -> None :
124- self ._sock_state = QAbstractSocket .SocketState .UnconnectedState
125- self .stateChanged .emit (self ._sock_state )
126-
127- if self .send_queue is not None :
128- self .send_queue .shutdown ()
129- self .send_queue = None
130- if self .reader_thread is not None :
131- self .reader_thread .quit ()
132- self .reader_thread = None
133- if self .writer_thread is not None :
134- self .writer_thread .quit ()
135- self .writer_thread = None
136110 if self .connection is not None :
137111 self .connection .close ()
138- self .connection = None
112+ self .connection = None
139113 self .socket = None
114+ self .sock_state = State .CLOSED
140115
141- def reader_writer_error (self ) -> None :
142- self . sync_state ( )
116+ def handle_error (self , error : Exception ) -> None :
117+ logger . error ( error )
143118 self .errorOccurred .emit (QAbstractSocket .SocketError .NetworkError )
119+ self .close ()
144120
145121 def open (self , url : QUrl ) -> None :
146- self ._sock_state = QAbstractSocket .SocketState .ConnectingState
147- self .stateChanged .emit (self ._sock_state )
122+ self .sock_state = State .CONNECTING
148123
149124 self .connect ()
150125 assert self .socket is not None
@@ -158,18 +133,9 @@ def open(self, url: QUrl) -> None:
158133 self .connection .debug = False
159134 self .connection .protocol .debug = False
160135
161- self .start_read_write ()
162- self .sync_state ()
136+ self .reader .set_connection (self .connection )
137+ if not self .reader_thread .isRunning ():
138+ self .reader_thread .start ()
139+ self ._start_read .emit ()
163140
164- def start_read_write (self ) -> None :
165- assert self .connection is not None
166-
167- self .reader_thread = SocketReader (self .connection )
168- self .reader_thread .message_received .connect (self .binaryMessageReceived .emit )
169- self .reader_thread .error .connect (self .reader_writer_error )
170- self .reader_thread .start ()
171-
172- self .send_queue = Queue ()
173- self .writer_thread = SocketWriter (self .connection , self .send_queue )
174- self .writer_thread .error .connect (self .reader_writer_error )
175- self .writer_thread .start ()
141+ self .sock_state = self .connection .state
0 commit comments