1
1
import asyncio
2
2
import sys
3
3
from inspect import isawaitable , iscoroutinefunction
4
+ from typing import Callable , Dict , List
4
5
from IPython import get_ipython
6
+ from IPython .core .interactiveshell import ExecutionResult
5
7
from queue import Queue
6
8
from imjoy_rpc .utils import FuturePromise
9
+ from zmq .eventloop .zmqstream import ZMQStream
7
10
8
11
background_tasks = set ()
9
12
10
13
11
14
class Viewers (object ):
15
+ """This class is designed to track each instance of the Viewer class that
16
+ is instantiated as well as whether or not that instance is available for
17
+ updates or requests.
18
+ """
12
19
def __init__ (self ):
13
20
self ._data = {}
14
21
15
22
@property
16
- def data (self ):
23
+ def data (self ) -> Dict [str , Dict [str , bool ]]:
24
+ """Get the underlying data dict containg all viewer data
25
+
26
+ :return: A dict of key, value pairs mapping the unique Viewer name to a
27
+ dictionary containing a 'ready' key and a boolean value reflecting the
28
+ ready state of the Viewer.
29
+ :rtype: Dict[str, Dict[str, bool]]
30
+ """
17
31
return self ._data
18
32
19
33
@property
20
- def not_created (self ):
21
- # Return a list of names of viewers that have not been created yet
22
- names = []
23
- for key , val in self .data .items ():
24
- name = val ['name' ]
25
- if not val ['status' ]:
26
- name = name if name is not None else key
27
- names .append (name )
28
- return names
29
-
30
- @property
31
- def not_named (self ):
32
- # Return a list of names of viewers that have not been named yet
33
- return any ([k for k , v in self .data .items () if v ['name' ] is None ])
34
-
35
- @property
36
- def viewer_objects (self ):
37
- # Return a list of created viewers
38
- return list (self .data .keys ())
39
-
40
- def add_viewer (self , view ):
41
- self .data [view ] = {'name' : None , 'status' : False }
42
-
43
- def set_name (self , view , name ):
34
+ def not_created (self ) -> List [str ]:
35
+ """Return a list of all unavailable viewers
36
+
37
+ :return: A list of names of viewers that have not yet been created.
38
+ :rtype: List[str]
39
+ """
40
+ return [k for k in self .data .keys () if not self .viewer_ready (k )]
41
+
42
+ def add_viewer (self , view : str ) -> None :
43
+ """Add a new Viewer object to track.
44
+
45
+ :param view: The unique string identifier for the Viewer object
46
+ :type view: str
47
+ """
48
+ self .data [view ] = {"ready" : False }
49
+
50
+ def update_viewer_status (self , view : str , status : bool ) -> None :
51
+ """Update a Viewer's 'ready' status.
52
+
53
+ :param view: The unique string identifier for the Viewer object
54
+ :type view: str
55
+ :param status: Boolean value indicating whether or not the viewer is
56
+ available for requests or updates. This should be false when the plugin
57
+ API is not yet available or new data is not yet rendered.
58
+ :type status: bool
59
+ """
44
60
if view not in self .data .keys ():
45
61
self .add_viewer (view )
46
- self .data [view ]['name' ] = name
62
+ self .data [view ]["ready" ] = status
47
63
48
- def update_viewer_status (self , view , status ):
49
- if view not in self .data .keys ():
50
- self .add_viewer (view )
51
- self .data [view ]['status' ] = status
64
+ def viewer_ready (self , view : str ) -> bool :
65
+ """Request the 'ready' status of a viewer.
66
+
67
+ :param view: The unique string identifier for the Viewer object
68
+ :type view: str
52
69
53
- def viewer_ready (self , view ):
54
- if viewer := self .data .get (view ):
55
- return viewer ['status' ]
56
- return False
70
+ :return: Boolean value indicating whether or not the viewer is
71
+ available for requests or updates. This will be false when the plugin
72
+ API is not yet available or new data is not yet rendered.
73
+ :rtype: bool
74
+ """
75
+ return self .data .get (view , {}).get ("ready" , False )
57
76
58
77
59
78
class CellWatcher (object ):
79
+ """A singleton class used in interactive Jupyter notebooks in order to
80
+ support asynchronous network communication that would otherwise be blocked
81
+ by the IPython kernel.
82
+ """
83
+
60
84
def __new__ (cls ):
61
- if not hasattr (cls , '_instance' ):
85
+ """Create a singleton class."""
86
+ if not hasattr (cls , "_instance" ):
62
87
cls ._instance = super (CellWatcher , cls ).__new__ (cls )
63
88
cls ._instance .setup ()
64
89
return cls ._instance
65
90
66
- def setup (self ):
91
+ def setup (self ) -> None :
92
+ """Perform the initial setup, including intercepting 'execute_request'
93
+ handlers so that we can handle them internally before the IPython
94
+ kernel does.
95
+ """
67
96
self .viewers = Viewers ()
68
97
self .shell = get_ipython ()
69
98
self .kernel = self .shell .kernel
@@ -87,22 +116,52 @@ def setup(self):
87
116
88
117
# Call self.post_run_cell every time the post_run_cell signal is emitted
89
118
# post_run_cell runs after interactive execution (e.g. a cell in a notebook)
90
- self .shell .events .register ('post_run_cell' , self .post_run_cell )
119
+ self .shell .events .register ("post_run_cell" , self .post_run_cell )
120
+
121
+ def add_viewer (self , view : str ) -> None :
122
+ """Add a new Viewer object to track.
91
123
92
- def add_viewer (self , view ):
124
+ :param view: The unique string identifier for the Viewer object
125
+ :type view: str
126
+ """
93
127
# Track all Viewer instances
94
128
self .viewers .add_viewer (view )
95
129
96
- def update_viewer_status (self , view , status ):
130
+ def update_viewer_status (self , view : str , status : bool ) -> None :
131
+ """Update a Viewer's 'ready' status. If the last cell run failed
132
+ because the viewer was unavailable try to run the cell again.
133
+
134
+ :param view: The unique string identifier for the Viewer object
135
+ :type view: str
136
+ :param status: Boolean value indicating whether or not the viewer is
137
+ available for requests or updates. This should be false when the plugin
138
+ API is not yet available or new data is not yet rendered.
139
+ :type status: bool
140
+ """
97
141
self .viewers .update_viewer_status (view , status )
98
- if self .waiting_on_viewer :
142
+ if status and self .waiting_on_viewer :
99
143
# Might be ready now, try again
100
144
self .create_task (self .execute_next_request )
101
145
102
- def viewer_ready (self , view ):
146
+ def viewer_ready (self , view : str ) -> bool :
147
+ """Request the 'ready' status of a viewer.
148
+
149
+ :param view: The unique string identifier for the Viewer object
150
+ :type view: str
151
+
152
+ :return: Boolean value indicating whether or not the viewer is
153
+ available for requests or updates. This will be false when the plugin
154
+ API is not yet available or new data is not yet rendered.
155
+ :rtype: bool
156
+ """
103
157
return self .viewers .viewer_ready (view )
104
158
105
- def _task_cleanup (self , task ):
159
+ def _task_cleanup (self , task : asyncio .Task ) -> None :
160
+ """Callback to discard references to tasks once they've completed.
161
+
162
+ :param task: Completed task that no longer needs a strong reference
163
+ :type task: asyncio.Task
164
+ """
106
165
global background_tasks
107
166
try :
108
167
# "Handle" exceptions here to prevent further errors. Exceptions
@@ -112,40 +171,83 @@ def _task_cleanup(self, task):
112
171
except :
113
172
background_tasks .discard (task )
114
173
115
- def create_task (self , fn ):
174
+ def create_task (self , fn : Callable ) -> None :
175
+ """Create a task from the function passed in.
176
+
177
+ :param fn: Coroutine to run concurrently as a Task
178
+ :type fn: Callable
179
+ """
116
180
global background_tasks
117
181
# The event loop only keeps weak references to tasks.
118
182
# Gather them into a set to avoid garbage collection mid-task.
119
183
task = asyncio .create_task (fn ())
120
184
background_tasks .add (task )
121
185
task .add_done_callback (self ._task_cleanup )
122
186
123
- def capture_event (self , stream , ident , parent ):
187
+ def capture_event (self , stream : ZMQStream , ident : list , parent : dict ) -> None :
188
+ """Capture execute_request messages so that we can queue and process
189
+ them concurrently as tasks to prevent blocking.
190
+
191
+ :param stream: Class to manage event-based messaging on a zmq socket
192
+ :type stream: ZMQStream
193
+ :param ident: ZeroMQ routing prefix, which can be zero or more socket
194
+ identities
195
+ :type ident: list
196
+ :param parent: A dictonary of dictionaries representing a complete
197
+ message as defined by the Jupyter message specification
198
+ :type parent: dict
199
+ """
124
200
self ._events .put ((stream , ident , parent ))
125
201
if self ._events .qsize () == 1 and self .ready_to_run_next_cell ():
126
202
# We've added a new task to an empty queue.
127
203
# Begin executing tasks again.
128
204
self .create_task (self .execute_next_request )
129
205
130
- async def capture_event_async (self , stream , ident , parent ):
206
+ async def capture_event_async (
207
+ self , stream : ZMQStream , ident : list , parent : dict
208
+ ) -> None :
209
+ """Capture execute_request messages so that we can queue and process
210
+ them concurrently as tasks to prevent blocking.
211
+ Asynchronous for ipykernel 6+.
212
+
213
+ :param stream: Class to manage event-based messaging on a zmq socket
214
+ :type stream: ZMQStream
215
+ :param ident: ZeroMQ routing prefix, which can be zero or more socket
216
+ identities
217
+ :type ident: list
218
+ :param parent: A dictonary of dictionaries representing a complete
219
+ message as defined by the Jupyter message specification
220
+ :type parent: dict
221
+ """
131
222
# ipykernel 6+
132
223
self .capture_event (stream , ident , parent )
133
224
134
225
@property
135
- def all_getters_resolved (self ):
136
- # Check if all of the getter/setter futures have resolved
226
+ def all_getters_resolved (self ) -> bool :
227
+ """Determine if all tasks representing asynchronous network calls that
228
+ fetch values have resolved.
229
+
230
+ :return: Whether or not all tasks for the current cell have resolved
231
+ :rtype: bool
232
+ """
137
233
getters_resolved = [f .done () for f in self .results .values ()]
138
234
return all (getters_resolved )
139
235
140
- def ready_to_run_next_cell (self ):
141
- # Any itk_viewer objects need to be available and all getters/setters
142
- # need to be resolved
236
+ def ready_to_run_next_cell (self ) -> bool :
237
+ """Determine if we are ready to run the next cell in the queue.
238
+
239
+ :return: If created Viewer objects are available and all futures are
240
+ resolved.
241
+ :rtype: bool
242
+ """
143
243
self .waiting_on_viewer = len (self .viewers .not_created )
144
244
return self .all_getters_resolved and not self .waiting_on_viewer
145
245
146
- async def execute_next_request (self ):
147
- # Modeled after the approach used in jupyter-ui-poll
148
- # https://github.com/Kirill888/jupyter-ui-poll/blob/f65b81f95623c699ed7fd66a92be6d40feb73cde/jupyter_ui_poll/_poll.py#L75-L101
246
+ async def execute_next_request (self ) -> None :
247
+ """Grab the next request if needed and then run the cell if it it ready
248
+ to be run. Modeled after the approach used in jupyter-ui-poll.
249
+ :ref: https://github.com/Kirill888/jupyter-ui-poll/blob/f65b81f95623c699ed7fd66a92be6d40feb73cde/jupyter_ui_poll/_poll.py#L75-L101
250
+ """
149
251
if self ._events .empty ():
150
252
self .abort_all = False
151
253
@@ -157,7 +259,8 @@ async def execute_next_request(self):
157
259
# Continue processing the remaining queued tasks
158
260
await self ._execute_next_request ()
159
261
160
- async def _execute_next_request (self ):
262
+ async def _execute_next_request (self ) -> None :
263
+ """Run the cell with the ipykernel shell_handler for execute_request"""
161
264
# Here we actually run the queued cell as it would have been run
162
265
stream , ident , parent = self .current_request
163
266
@@ -187,32 +290,38 @@ async def _execute_next_request(self):
187
290
# Continue processing the remaining queued tasks
188
291
self .create_task (self .execute_next_request )
189
292
190
- def update_namespace (self ):
191
- # Update the namespace variables with the results from the getters
293
+ def update_namespace (self ) -> None :
294
+ """ Update the namespace variables with the results from the getters"""
192
295
# FIXME: This is a temporary "fix" and does not handle updating output
193
296
keys = [k for k in self .shell .user_ns .keys ()]
194
- try :
195
- for key in keys :
196
- value = self . shell . user_ns [ key ]
197
- if asyncio . isfuture ( value ) and ( isinstance (value , FuturePromise ) or isinstance (value , asyncio .Task )):
198
- # Getters/setters return futures
199
- # They should all be resolved now, so use the result
200
- self . shell . user_ns [ key ] = value . result ()
201
- self . results . clear ()
202
- except Exception as e :
203
- self .results .clear ()
204
- self . abort_all = True
205
- self . create_task ( self . _execute_next_request )
206
- raise e
207
-
208
- def _callback ( self , * args , ** kwargs ):
297
+ for key in keys :
298
+ value = self . shell . user_ns [ key ]
299
+ if asyncio . isfuture ( value ) and (
300
+ isinstance (value , FuturePromise ) or isinstance (value , asyncio .Task )
301
+ ):
302
+ # Functions that need to return values from asynchronous
303
+ # network requests return futures. They should all be resolved
304
+ # now, so use the result.
305
+ self . shell . user_ns [ key ] = value . result ()
306
+ self .results .clear ()
307
+
308
+ def _callback ( self , * args , ** kwargs ) -> None :
309
+ """After each future resolves check to see if they are all resolved. If
310
+ so, update the namespace and run the next cell in the queue.
311
+ """
209
312
# After each getter/setter resolves check if they've all resolved
210
313
if self .all_getters_resolved :
211
314
self .update_namespace ()
212
315
self .current_request = None
213
316
self .create_task (self .execute_next_request )
214
317
215
- def post_run_cell (self , response ):
318
+ def post_run_cell (self , response : ExecutionResult ) -> None :
319
+ """Runs after interactive execution (e.g. a cell in a notebook). Set
320
+ the abort flag if there are errors produced by cell execution.
321
+
322
+ :param response: The response message produced by cell execution
323
+ :type response: ExecutionResult
324
+ """
216
325
# Abort remaining cells on error in execution
217
326
if response .error_in_exec is not None :
218
327
self .abort_all = True
0 commit comments