|
5 | 5 | import json |
6 | 6 | import random |
7 | 7 | import ssl |
| 8 | +import subprocess |
8 | 9 | import threading |
9 | 10 | from time import sleep |
10 | 11 | from typing import List, NoReturn, Union |
|
13 | 14 | from websocket import WebSocketApp, WebSocketConnectionClosedException |
14 | 15 | from websocket._abnf import ABNF |
15 | 16 |
|
| 17 | + |
16 | 18 | from livechat.utils.structures import RtmResponse |
17 | 19 |
|
18 | 20 |
|
19 | 21 | def on_message(ws_client: WebSocketApp, message: str): |
20 | 22 | ''' Custom WebSocketApp handler that inserts new messages in front of `self.messages` list. ''' |
21 | 23 | ws_client.messages.insert(0, json.loads(message)) |
22 | 24 |
|
| 25 | +def on_close(ws_client: WebSocketApp, close_status_code: int, close_msg: str): |
| 26 | + logger.info('websocket closed:') |
| 27 | + |
| 28 | + if close_status_code or close_msg: |
| 29 | + logger.info("close status code: " + str(close_status_code)) |
| 30 | + logger.info("close message: " + str(close_msg)) |
| 31 | + |
23 | 32 |
|
24 | 33 | class WebsocketClient(WebSocketApp): |
25 | 34 | ''' Custom extension of the WebSocketApp class for livechat python SDK. ''' |
@@ -77,22 +86,30 @@ def send(self, request: dict, opcode=ABNF.OPCODE_TEXT) -> dict: |
77 | 86 | RtmResponse: RTM response structure (`request_id`, `action`, |
78 | 87 | `type`, `success` and `payload` properties) |
79 | 88 | ''' |
80 | | - response_timeout = self.response_timeout |
81 | 89 | request_id = str(random.randint(1, 9999999999)) |
82 | 90 | request.update({'request_id': request_id}) |
83 | 91 | request_json = json.dumps(request, indent=4) |
84 | 92 | logger.info(f'\nREQUEST:\n{request_json}') |
| 93 | + |
85 | 94 | if not self.sock or self.sock.send(request_json, opcode) == 0: |
86 | 95 | raise WebSocketConnectionClosedException( |
87 | 96 | 'Connection is already closed.') |
88 | | - while not (response := next( |
89 | | - (item |
90 | | - for item in self.messages if item.get('request_id') == request_id |
91 | | - and item.get('type') == 'response'), |
92 | | - None)) and response_timeout > 0: |
| 97 | + |
| 98 | + response: dict |
| 99 | + def await_message(): |
| 100 | + for item in self.messages: |
| 101 | + if item.get('request_id') == request_id and item.get('type') == 'response': |
| 102 | + response = item |
| 103 | + return |
93 | 104 | sleep(0.2) |
94 | | - response_timeout -= 0.2 |
95 | | - logger.info(f'\nRESPONSE:\n{json.dumps(response, indent=4)}') |
| 105 | + try: |
| 106 | + subprocess.run(['await_message', self.response_timeout], timeout = 2) |
| 107 | + logger.info(f'\nRESPONSE:\n{json.dumps(response, indent=4)}') |
| 108 | + except subprocess.TimeoutExpired: |
| 109 | + logger.error(f'timed out waiting for message with request_id {request_id}') |
| 110 | + logger.debug('all websocket messages received before timeout:') |
| 111 | + logger.debug(self.messages) |
| 112 | + |
96 | 113 | return RtmResponse(response) |
97 | 114 |
|
98 | 115 | def _wait_till_sock_connected(self, |
|
0 commit comments