22Client for WebSocket connections.
33'''
44
5+ import concurrent .futures
56import json
67import random
78import ssl
@@ -21,12 +22,26 @@ def on_message(ws_client: WebSocketApp, message: str):
2122 ws_client .messages .insert (0 , json .loads (message ))
2223
2324
25+ def on_close (ws_client : WebSocketApp , close_status_code : int , close_msg : str ):
26+ logger .info ('websocket closed:' )
27+
28+ if close_status_code or close_msg :
29+ logger .info ('close status code: ' + str (close_status_code ))
30+ logger .info ('close message: ' + str (close_msg ))
31+
32+
33+ def on_error (ws_client : WebSocketApp , error : Exception ):
34+ logger .error (f'websocket error occurred: { str (error )} ' )
35+
36+
2437class WebsocketClient (WebSocketApp ):
2538 ''' Custom extension of the WebSocketApp class for livechat python SDK. '''
2639 def __init__ (self , * args , ** kwargs ):
2740 super ().__init__ (* args , ** kwargs )
2841 self .messages : List [dict ] = []
2942 self .on_message = on_message
43+ self .on_close = on_close
44+ self .on_error = on_error
3045 self .response_timeout = None
3146
3247 def open (self ,
@@ -77,22 +92,38 @@ def send(self, request: dict, opcode=ABNF.OPCODE_TEXT) -> dict:
7792 RtmResponse: RTM response structure (`request_id`, `action`,
7893 `type`, `success` and `payload` properties)
7994 '''
80- response_timeout = self .response_timeout
8195 request_id = str (random .randint (1 , 9999999999 ))
8296 request .update ({'request_id' : request_id })
8397 request_json = json .dumps (request , indent = 4 )
8498 logger .info (f'\n REQUEST:\n { request_json } ' )
99+
85100 if not self .sock or self .sock .send (request_json , opcode ) == 0 :
86101 raise WebSocketConnectionClosedException (
87102 'Connection is already closed.' )
88- while not (response := next (
89- (item
90- for item in self .messages if item .get ('request_id' ) == request_id
91- and item .get ('type' ) == 'response' ),
92- None )) and response_timeout > 0 :
93- sleep (0.2 )
94- response_timeout -= 0.2
95- logger .info (f'\n RESPONSE:\n { json .dumps (response , indent = 4 )} ' )
103+
104+ def await_message (stop_event : threading .Event ) -> dict :
105+ while not stop_event .is_set ():
106+ for item in self .messages :
107+ if item .get ('request_id' ) == request_id and item .get (
108+ 'type' ) == 'response' :
109+ return item
110+ sleep (0.2 )
111+
112+ with concurrent .futures .ThreadPoolExecutor () as executor :
113+ stop_event = threading .Event ()
114+ future = executor .submit (await_message , stop_event )
115+ try :
116+ response = future .result (timeout = self .response_timeout )
117+ logger .info (f'\n RESPONSE:\n { json .dumps (response , indent = 4 )} ' )
118+ except concurrent .futures .TimeoutError :
119+ stop_event .set ()
120+ logger .error (
121+ f'timed out waiting for message with request_id { request_id } '
122+ )
123+ logger .debug ('all websocket messages received before timeout:' )
124+ logger .debug (self .messages )
125+ return None
126+
96127 return RtmResponse (response )
97128
98129 def _wait_till_sock_connected (self ,
0 commit comments