@@ -41,8 +41,10 @@ def _get_stderr_fileno() -> Optional[int]:
41
41
42
42
43
43
class Transport (ABC ):
44
- def __init__ (self ) -> None :
44
+ def __init__ (self , loop : asyncio .AbstractEventLoop ) -> None :
45
+ self ._loop = loop
45
46
self .on_message = lambda _ : None
47
+ self .on_error_future : asyncio .Future = loop .create_future ()
46
48
47
49
@abstractmethod
48
50
def request_stop (self ) -> None :
@@ -55,9 +57,9 @@ def dispose(self) -> None:
55
57
async def wait_until_stopped (self ) -> None :
56
58
pass
57
59
60
+ @abstractmethod
58
61
async def run (self ) -> None :
59
- self ._loop = asyncio .get_running_loop ()
60
- self .on_error_future : asyncio .Future = asyncio .Future ()
62
+ pass
61
63
62
64
@abstractmethod
63
65
def send (self , message : Dict ) -> None :
@@ -78,11 +80,12 @@ def deserialize_message(self, data: bytes) -> Any:
78
80
79
81
80
82
class PipeTransport (Transport ):
81
- def __init__ (self , driver_executable : Path ) -> None :
82
- super ().__init__ ()
83
+ def __init__ (
84
+ self , loop : asyncio .AbstractEventLoop , driver_executable : Path
85
+ ) -> None :
86
+ super ().__init__ (loop )
83
87
self ._stopped = False
84
88
self ._driver_executable = driver_executable
85
- self ._loop : asyncio .AbstractEventLoop
86
89
87
90
def request_stop (self ) -> None :
88
91
self ._stopped = True
@@ -93,17 +96,21 @@ async def wait_until_stopped(self) -> None:
93
96
await self ._proc .wait ()
94
97
95
98
async def run (self ) -> None :
96
- await super ().run ()
97
99
self ._stopped_future : asyncio .Future = asyncio .Future ()
98
100
99
- self ._proc = proc = await asyncio .create_subprocess_exec (
100
- str (self ._driver_executable ),
101
- "run-driver" ,
102
- stdin = asyncio .subprocess .PIPE ,
103
- stdout = asyncio .subprocess .PIPE ,
104
- stderr = _get_stderr_fileno (),
105
- limit = 32768 ,
106
- )
101
+ try :
102
+ self ._proc = proc = await asyncio .create_subprocess_exec (
103
+ str (self ._driver_executable ),
104
+ "run-driver" ,
105
+ stdin = asyncio .subprocess .PIPE ,
106
+ stdout = asyncio .subprocess .PIPE ,
107
+ stderr = _get_stderr_fileno (),
108
+ limit = 32768 ,
109
+ )
110
+ except Exception as exc :
111
+ self .on_error_future .set_exception (exc )
112
+ return
113
+
107
114
assert proc .stdout
108
115
assert proc .stdin
109
116
self ._output = proc .stdin
@@ -138,16 +145,17 @@ def send(self, message: Dict) -> None:
138
145
139
146
class WebSocketTransport (AsyncIOEventEmitter , Transport ):
140
147
def __init__ (
141
- self , ws_endpoint : str , timeout : float = None , headers : Dict [str , str ] = None
148
+ self ,
149
+ loop : asyncio .AbstractEventLoop ,
150
+ ws_endpoint : str ,
151
+ headers : Dict [str , str ] = None ,
142
152
) -> None :
143
- super ().__init__ ()
144
- Transport .__init__ (self )
153
+ super ().__init__ (loop )
154
+ Transport .__init__ (self , loop )
145
155
146
156
self ._stopped = False
147
157
self .ws_endpoint = ws_endpoint
148
- self .timeout = timeout
149
158
self .headers = headers
150
- self ._loop : asyncio .AbstractEventLoop
151
159
152
160
def request_stop (self ) -> None :
153
161
self ._stopped = True
@@ -160,15 +168,13 @@ async def wait_until_stopped(self) -> None:
160
168
await self ._connection .wait_closed ()
161
169
162
170
async def run (self ) -> None :
163
- await super ().run ()
164
-
165
- options : Dict [str , Any ] = {}
166
- if self .timeout is not None :
167
- options ["close_timeout" ] = self .timeout / 1000
168
- options ["ping_timeout" ] = self .timeout / 1000
169
- if self .headers is not None :
170
- options ["extra_headers" ] = self .headers
171
- self ._connection = await websockets .connect (self .ws_endpoint , ** options )
171
+ try :
172
+ self ._connection = await websockets .connect (
173
+ self .ws_endpoint , extra_headers = self .headers
174
+ )
175
+ except Exception as exc :
176
+ self .on_error_future .set_exception (Error (f"websocket.connect: { str (exc )} " ))
177
+ return
172
178
173
179
while not self ._stopped :
174
180
try :
@@ -188,8 +194,8 @@ async def run(self) -> None:
188
194
)
189
195
break
190
196
except Exception as exc :
191
- print (f"Received unhandled exception: { exc } " )
192
197
self .on_error_future .set_exception (exc )
198
+ break
193
199
194
200
def send (self , message : Dict ) -> None :
195
201
if self ._stopped or self ._connection .closed :
0 commit comments