@@ -48,15 +48,15 @@ def base36(n):
4848
4949
5050class CCAPI (RootAPIMixin ):
51- def __init__ (self , nid , program , cleanup_callback ):
51+ def __init__ (self , nid , program , sender ):
5252 self ._id = nid
5353 self ._task_autoid = 1
54- self ._cmd = asyncio .Queue (maxsize = 1 )
5554 self ._result_locks = {}
5655 self ._result_values = {}
5756 self ._result_queues = {}
5857 self ._event_to_tids = {}
5958 self ._tid_to_event = {}
59+ self ._sender = sender
6060
6161 self .colors = ColorsAPI (self , 'colors' )
6262 self .commands = CommandsAPI (self , 'commands' )
@@ -81,27 +81,32 @@ def __init__(self, nid, program, cleanup_callback):
8181
8282 async def prog_wrap ():
8383 err = None
84+ cancel = False
8485 try :
8586 await program (self )
8687 except asyncio .CancelledError :
8788 print ('program {} cancelled' .format (self ._id ))
8889 print_exc ()
8990 err = 'program has been cancelled'
91+ cancel = True
9092 except Exception as e :
9193 print ('program {} crashed: {} {}' .format (self ._id , type (e ), e ))
9294 print_exc ()
9395 err = type (e ).__name__ + ': ' + str (e )
9496 else :
9597 print ('program {} finished' .format (self ._id ))
9698 finally :
97- c = { 'action' : 'close' }
98- if err is not None :
99- c [ 'error' ] = err
100- await self . _cmd . put ( c )
101- cleanup_callback ( )
99+ if not cancel :
100+ c = { 'action' : 'close' }
101+ if err is not None :
102+ c [ 'error' ] = err
103+ await self . _sender ( c )
102104
103105 self ._task = asyncio .create_task (prog_wrap ())
104106
107+ def cancel (self ):
108+ self ._task .cancel ()
109+
105110 def _new_task_id (self ) -> str :
106111 task_id = base36 (self ._task_autoid )
107112 self ._task_autoid += 1
@@ -110,7 +115,7 @@ def _new_task_id(self) -> str:
110115 async def _eval (self , lua_code , immediate = False ):
111116 task_id = self ._new_task_id ()
112117 self ._result_locks [task_id ] = asyncio .Event ()
113- await self ._cmd . put ({
118+ await self ._sender ({
114119 'action' : 'task' ,
115120 'task_id' : task_id ,
116121 'code' : lua_code ,
@@ -133,7 +138,7 @@ async def _start_queue(self, event):
133138 self ._result_queues [task_id ] = asyncio .Queue ()
134139 es = self ._event_to_tids .setdefault (event , set ())
135140 if not es :
136- await self ._cmd . put ({
141+ await self ._sender ({
137142 'action' : 'sub' ,
138143 'event' : event ,
139144 })
@@ -147,7 +152,7 @@ async def _stop_queue(self, task_id):
147152 del self ._tid_to_event [task_id ]
148153 self ._event_to_tids [event ].discard (task_id )
149154 if not self ._event_to_tids [event ]:
150- await self ._cmd . put ({
155+ await self ._sender ({
151156 'action' : 'unsub' ,
152157 'event' : event ,
153158 })
@@ -166,16 +171,6 @@ async def _create_temp_object(self, create_expr: str, finalizer_template: str =
166171
167172
168173class CCApplication (web .Application ):
169- @staticmethod
170- async def _sender (ws , api ):
171- while not ws .closed :
172- cmd = await api ._cmd .get ()
173- # print(f'_sender: {cmd}')
174- if not ws .closed :
175- await ws .send_json (cmd )
176- if cmd ['action' ] == 'close' :
177- break
178-
179174 @staticmethod
180175 async def _json_messages (ws ):
181176 async for msg in ws :
@@ -207,29 +202,32 @@ async def _launch_program(self, ws):
207202 'error' : "program doesn't exist" ,
208203 })
209204 return None
210- return CCAPI (msg ['computer' ], program , lambda : None )
205+
206+ return CCAPI (msg ['computer' ], program , ws .send_json )
211207
212208 async def ws (self , request ):
213209 ws = web .WebSocketResponse ()
214210 await ws .prepare (request )
215211
216212 api = await self ._launch_program (ws )
217213 if api is not None :
218- asyncio .create_task (self ._sender (ws , api ))
219- async for msg in self ._json_messages (ws ):
220- if msg ['action' ] == 'event' :
221- for task_id in api ._event_to_tids .get (msg ['event' ], ()):
222- await api ._result_queues [task_id ].put (msg ['params' ])
223- elif msg ['action' ] == 'task_result' :
224- api ._result_values [msg ['task_id' ]] = msg ['result' ]
225- api ._result_locks [msg ['task_id' ]].set ()
226- # print(msg['task_id'], msg['yields'])
227- else :
228- await ws .send_json ({
229- 'action' : 'close' ,
230- 'error' : 'protocol error' ,
231- })
232- break
214+ try :
215+ async for msg in self ._json_messages (ws ):
216+ if msg ['action' ] == 'event' :
217+ for task_id in api ._event_to_tids .get (msg ['event' ], ()):
218+ await api ._result_queues [task_id ].put (msg ['params' ])
219+ elif msg ['action' ] == 'task_result' :
220+ api ._result_values [msg ['task_id' ]] = msg ['result' ]
221+ api ._result_locks [msg ['task_id' ]].set ()
222+ # print(msg['task_id'], msg['yields'])
223+ else :
224+ await ws .send_json ({
225+ 'action' : 'close' ,
226+ 'error' : 'protocol error' ,
227+ })
228+ break
229+ finally :
230+ api .cancel ()
233231
234232 return ws
235233
0 commit comments