33import string
44from aiohttp import web
55from traceback import print_exc
6- from os .path import getmtime , join , dirname , abspath
7- from os import listdir , getcwd
8- import importlib
9- import importlib .util
6+ from os .path import join , dirname , abspath
7+ from importlib import import_module
108import argparse
119
1210from .subapis .root import RootAPIMixin
3432
3533THIS_DIR = dirname (abspath (__file__ ))
3634LUA_FILE = join (THIS_DIR , 'back.lua' )
37- CURRENT_DIR = getcwd ()
38-
39-
40- exchange = {}
41- module_map = {}
42-
43-
4435DIGITS = string .digits + string .ascii_lowercase
4536
4637
@@ -58,48 +49,8 @@ async def lua_json(request):
5849 return json .loads (body )
5950
6051
61- def program_filenames ():
62- return [f [:- 3 ] for f in listdir (CURRENT_DIR ) if f .endswith ('.py' ) and f != '__init__.py' ]
63-
64-
65- def m_filename (m ):
66- return join (CURRENT_DIR , '{}.py' .format (m ))
67-
68-
69- async def reload_all_modules (module_map ):
70- prev = set (module_map .keys ())
71- nxt = set (program_filenames ())
72-
73- # unloading old modules
74- for m in prev - nxt :
75- del module_map [m ]
76-
77- # loading new modules
78- for m in nxt - prev :
79- spec = importlib .util .spec_from_file_location ('ccprograms.{}' .format (m ), m_filename (m ))
80- module_map [m ] = importlib .util .module_from_spec (spec )
81- spec .loader .exec_module (module_map [m ])
82- module_map [m ]._mtime_mark = getmtime (m_filename (m ))
83- print ('Loaded {}' .format (m ))
84-
85- # reloading modified modules
86- for m in nxt & prev :
87- mtime = getmtime (m_filename (m ))
88- if module_map [m ]._mtime_mark < mtime :
89- importlib .reload (module_map [m ])
90- module_map [m ]._mtime_mark = mtime
91- print ('Reloaded {}' .format (m ))
92-
93-
94- async def module_reloader ():
95- while True :
96- fut = asyncio .ensure_future (reload_all_modules (module_map ))
97- await asyncio .wait ([fut ])
98- await asyncio .sleep (5 )
99-
100-
10152class CCAPI (RootAPIMixin ):
102- def __init__ (self , nid , program ):
53+ def __init__ (self , nid , program , cleanup_callback ):
10354 self ._id = nid
10455 self ._task_autoid = 1
10556 self ._cmd = asyncio .Queue (maxsize = 1 )
@@ -142,7 +93,7 @@ async def prog_wrap():
14293 finally :
14394 if not cancel :
14495 await self ._cmd .put ('END' )
145- del exchange [ self . _id ]
96+ cleanup_callback ()
14697
14798 self ._task = asyncio .ensure_future (prog_wrap ())
14899
@@ -174,47 +125,6 @@ async def _stop_queue(self, task_id):
174125 del self ._result_queues [task_id ]
175126
176127
177- async def start (request ):
178- tid = int (request .match_info ['turtle' ])
179- if tid in exchange :
180- # terminate old program
181- exchange [tid ]._task .cancel ()
182- exchange [tid ] = CCAPI (tid , module_map [request .match_info ['program' ]].program )
183- return web .Response (text = '' )
184-
185-
186- async def gettask (request ):
187- api = exchange .get (int (request .match_info ['turtle' ]))
188- if api is None :
189- return web .Response (text = 'END' )
190- return web .Response (text = await api ._cmd .get ())
191-
192-
193- async def taskresult (request ):
194- api = exchange .get (int (request .match_info ['turtle' ]))
195- if api is not None :
196- tid = request .match_info ['task_id' ]
197- if tid in api ._result_locks :
198- # it's a TASK
199- api ._result_values [tid ] = await lua_json (request )
200- api ._result_locks [tid ].set ()
201- elif tid in api ._result_queues :
202- # it's a QUEUE
203- await api ._result_queues [tid ].put (await lua_json (request ))
204- # otherwise just ignore
205- return web .Response (text = '' )
206-
207-
208- def backdoor (request ):
209- with open (LUA_FILE , 'r' ) as f :
210- fcont = f .read ()
211- fcont = fcont .replace (
212- "local url = 'http://127.0.0.1:4343/'" ,
213- "local url = '{}://{}/'" .format (request .scheme , request .host )
214- )
215- return web .Response (text = fcont )
216-
217-
218128logging_config = '''
219129version: 1
220130disable_existing_loggers: false
@@ -244,10 +154,65 @@ def enable_request_logging():
244154 logging .config .dictConfig (yaml .load (logging_config ))
245155
246156
157+ class CCApplication (web .Application ):
158+ async def start (self , request ):
159+ tid = int (request .match_info ['turtle' ])
160+ if tid in self ['exchange' ]:
161+ # terminate old program
162+ self ['exchange' ][tid ]._task .cancel ()
163+ module = import_module (self ['source_module' ])
164+ program = getattr (module , request .match_info ['program' ])
165+
166+ def cleanup_callback ():
167+ del self ['exchange' ][tid ]
168+
169+ self ['exchange' ][tid ] = CCAPI (tid , program , cleanup_callback )
170+ return web .Response (text = '' )
171+
172+ async def gettask (self , request ):
173+ api = self ['exchange' ].get (int (request .match_info ['turtle' ]))
174+ if api is None :
175+ return web .Response (text = 'END' )
176+ return web .Response (text = await api ._cmd .get ())
177+
178+ async def taskresult (self , request ):
179+ api = self ['exchange' ].get (int (request .match_info ['turtle' ]))
180+ if api is not None :
181+ tid = request .match_info ['task_id' ]
182+ if tid in api ._result_locks :
183+ # it's a TASK
184+ api ._result_values [tid ] = await lua_json (request )
185+ api ._result_locks [tid ].set ()
186+ elif tid in api ._result_queues :
187+ # it's a QUEUE
188+ await api ._result_queues [tid ].put (await lua_json (request ))
189+ # otherwise just ignore
190+ return web .Response (text = '' )
191+
192+ @staticmethod
193+ def backdoor (request ):
194+ with open (LUA_FILE , 'r' ) as f :
195+ fcont = f .read ()
196+ fcont = fcont .replace (
197+ "local url = 'http://127.0.0.1:4343/'" ,
198+ "local url = '{}://{}/'" .format (request .scheme , request .host )
199+ )
200+ return web .Response (text = fcont )
201+
202+ def initialize (self , source_module ):
203+ self ['source_module' ] = source_module
204+ self ['exchange' ] = {}
205+ self .router .add_get ('/' , self .backdoor )
206+ self .router .add_post ('/start/{turtle}/{program}/' , self .start )
207+ self .router .add_post ('/gettask/{turtle}/' , self .gettask )
208+ self .router .add_post ('/taskresult/{turtle}/{task_id}/' , self .taskresult )
209+
210+
247211def main ():
248212 # enable_request_logging()
249213
250214 parser = argparse .ArgumentParser ()
215+ parser .add_argument ('module' , help = 'Module used as source for programs' )
251216 parser .add_argument ('--host' )
252217 parser .add_argument ('--port' , type = int )
253218 args = parser .parse_args ()
@@ -258,12 +223,8 @@ def main():
258223 if args .port is not None :
259224 app_kw ['port' ] = args .port
260225
261- asyncio .ensure_future (module_reloader ())
262- app = web .Application ()
263- app .router .add_get ('/' , backdoor )
264- app .router .add_post ('/start/{turtle}/{program}/' , start )
265- app .router .add_post ('/gettask/{turtle}/' , gettask )
266- app .router .add_post ('/taskresult/{turtle}/{task_id}/' , taskresult )
226+ app = CCApplication ()
227+ app .initialize (args .module )
267228 web .run_app (app , ** app_kw )
268229
269230
0 commit comments