@@ -131,7 +131,7 @@ def __init__(self, **kwargs):
131
131
self .ws_future = Future ()
132
132
self .disconnected = False
133
133
134
- async def _connect (self , kernel_id ):
134
+ async def _connect (self , kernel_id , message_callback ):
135
135
# websocket is initialized before connection
136
136
self .ws = None
137
137
self .kernel_id = kernel_id
@@ -147,6 +147,12 @@ async def _connect(self, kernel_id):
147
147
self .ws_future = websocket_connect (request )
148
148
self .ws_future .add_done_callback (self ._connection_done )
149
149
150
+ loop = IOLoop .current ()
151
+ loop .add_future (
152
+ self .ws_future ,
153
+ lambda future : self ._read_messages (message_callback )
154
+ )
155
+
150
156
def _connection_done (self , fut ):
151
157
if not self .disconnected and fut .exception () is None : # prevent concurrent.futures._base.CancelledError
152
158
self .ws = fut .result ()
@@ -185,18 +191,13 @@ async def _read_messages(self, callback):
185
191
186
192
if not self .disconnected : # if websocket is not disconnected by client, attept to reconnect to Gateway
187
193
self .log .info ("Attempting to re-establish the connection to Gateway: {}" .format (self .kernel_id ))
188
- self ._connect (self .kernel_id )
189
194
loop = IOLoop .current ()
190
- loop .add_future (self .ws_future , lambda future : self ._read_messages ( callback ) )
195
+ loop .spawn_callback (self ._connect , self .kernel_id , callback )
191
196
192
197
def on_open (self , kernel_id , message_callback , ** kwargs ):
193
198
"""Web socket connection open against gateway server."""
194
- self ._connect (kernel_id )
195
199
loop = IOLoop .current ()
196
- loop .add_future (
197
- self .ws_future ,
198
- lambda future : self ._read_messages (message_callback )
199
- )
200
+ loop .spawn_callback (self ._connect , kernel_id , message_callback )
200
201
201
202
def on_message (self , message ):
202
203
"""Send message to gateway server."""
0 commit comments