@@ -134,7 +134,7 @@ def __init__(self, **kwargs):
134
134
self .ws_future = Future ()
135
135
self .disconnected = False
136
136
137
- async def _connect (self , kernel_id ):
137
+ async def _connect (self , kernel_id , message_callback ):
138
138
# websocket is initialized before connection
139
139
self .ws = None
140
140
self .kernel_id = kernel_id
@@ -150,6 +150,12 @@ async def _connect(self, kernel_id):
150
150
self .ws_future = websocket_connect (request )
151
151
self .ws_future .add_done_callback (self ._connection_done )
152
152
153
+ loop = IOLoop .current ()
154
+ loop .add_future (
155
+ self .ws_future ,
156
+ lambda future : self ._read_messages (message_callback )
157
+ )
158
+
153
159
def _connection_done (self , fut ):
154
160
if not self .disconnected and fut .exception () is None : # prevent concurrent.futures._base.CancelledError
155
161
self .ws = fut .result ()
@@ -188,18 +194,13 @@ async def _read_messages(self, callback):
188
194
189
195
if not self .disconnected : # if websocket is not disconnected by client, attept to reconnect to Gateway
190
196
self .log .info ("Attempting to re-establish the connection to Gateway: {}" .format (self .kernel_id ))
191
- self ._connect (self .kernel_id )
192
197
loop = IOLoop .current ()
193
- loop .add_future (self .ws_future , lambda future : self ._read_messages ( callback ) )
198
+ loop .spawn_callback (self ._connect , self .kernel_id , callback )
194
199
195
200
def on_open (self , kernel_id , message_callback , ** kwargs ):
196
201
"""Web socket connection open against gateway server."""
197
- self ._connect (kernel_id )
198
202
loop = IOLoop .current ()
199
- loop .add_future (
200
- self .ws_future ,
201
- lambda future : self ._read_messages (message_callback )
202
- )
203
+ loop .spawn_callback (self ._connect , kernel_id , message_callback )
203
204
204
205
def on_message (self , message ):
205
206
"""Send message to gateway server."""
0 commit comments