Skip to content

Commit 8f4fa64

Browse files
committed
Use asyncio in ModelWrapper and adapt API
After moving to aiohttp it is easy to move the calls to de underlying method to coroutines using asyncio. In order to do so we convert the ModelWrapper to use async methods, and then we adapt the API to these changes. As a side effect, we change the way we are defining the routes and views, since we need to include a configuration option defining the number of worker instances that we want to spawn. We also implement a new warm method that can be defined by the user, so that this method is called before the API is spawned. Sem-Ver: api-break
1 parent 8cc787f commit 8f4fa64

File tree

17 files changed

+313
-176
lines changed

17 files changed

+313
-176
lines changed

deepaas/api/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
CONF = cfg.CONF
3333

3434

35-
def get_app(doc="/docs"):
35+
async def get_app(doc="/docs"):
3636
"""Get the main app."""
3737
global APP
3838

@@ -66,6 +66,10 @@ def get_app(doc="/docs"):
6666

6767
LOG.info("Serving loaded V2 models: %s", list(model.V2_MODELS.keys()))
6868

69+
for _, m in model.V2_MODELS.items():
70+
LOG.debug("Warming models...")
71+
await m.warm()
72+
6973
if doc:
7074
# init docs with all parameters, usual for ApiSpec
7175
aiohttp_apispec.setup_aiohttp_apispec(

deepaas/api/v2/__init__.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,10 @@
2424
from deepaas.api.v2 import predict as v2_predict
2525
from deepaas.api.v2 import responses
2626
from deepaas.api.v2 import train as v2_train
27-
from deepaas import model
2827

2928
CONF = cfg.CONF
3029
LOG = log.getLogger("deepaas.api.v2")
3130

32-
# Get the models (this is a singleton, so it is safe to call it multiple times
33-
model.register_v2_models()
34-
3531
APP = None
3632

3733

@@ -51,10 +47,10 @@ def get_app():
5147
v2_debug.setup_debug()
5248

5349
APP.router.add_get('/', get_version, name="v2")
54-
APP.add_routes(v2_debug.routes)
55-
APP.add_routes(v2_model.routes)
56-
APP.add_routes(v2_train.routes)
57-
APP.add_routes(v2_predict.routes)
50+
v2_debug.setup_routes(APP)
51+
v2_model.setup_routes(APP)
52+
v2_train.setup_routes(APP)
53+
v2_predict.setup_routes(APP)
5854

5955
return APP
6056

deepaas/api/v2/debug.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,8 @@
2525
from oslo_log import log
2626
import six
2727

28-
from deepaas import model
29-
3028
CONF = cfg.CONF
3129

32-
# Get the models (this is a singleton, so it is safe to call it multiple times
33-
model.register_v2_models()
34-
3530
app = web.Application()
3631
routes = web.RouteTableDef()
3732

@@ -85,10 +80,13 @@ def setup_debug():
8580
summary="""Return debug information if enabled by API.""",
8681
description="""Return debug information if enabled by API.""",
8782
)
88-
@routes.get('/debug')
89-
async def get(self):
83+
async def get(request):
9084
print("--- DEBUG MARKER %s ---" % datetime.datetime.now())
9185
resp = ""
9286
if DEBUG_STREAM is not None:
9387
resp = DEBUG_STREAM.getvalue()
9488
return web.Response(text=resp)
89+
90+
91+
def setup_routes(app):
92+
app.router.add_get("/debug", get)

deepaas/api/v2/models.py

Lines changed: 36 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,6 @@
2020
from deepaas.api.v2 import responses
2121
from deepaas import model
2222

23-
# Get the models (this is a singleton, so it is safe to call it multiple times
24-
model.register_v2_models()
25-
26-
app = web.Application()
27-
routes = web.RouteTableDef()
28-
2923

3024
@aiohttp_apispec.docs(
3125
tags=["models"],
@@ -35,9 +29,8 @@
3529
"will return the loaded models, as long as their basic "
3630
"metadata.",
3731
)
38-
@routes.get('/models')
3932
@aiohttp_apispec.response_schema(responses.ModelMeta(), 200)
40-
async def get(request):
33+
async def index(request):
4134
"""Return loaded models and its information.
4235
4336
DEEPaaS can load several models and server them on the same endpoint,
@@ -61,30 +54,40 @@ async def get(request):
6154
return web.json_response({"models": models})
6255

6356

64-
# In the next lines we iterate over the loaded models and create the different
65-
# resources for each model. This way we can also load the expected parameters
66-
# if needed (as in the training method).
67-
for model_name, model_obj in model.V2_MODELS.items():
68-
@routes.view('/models/%s' % model_name)
69-
class BaseModel(web.View):
70-
model_name = model_name
71-
model_obj = model_obj
57+
class Handler(object):
58+
model_name = None
59+
model_obj = None
60+
61+
def __init__(self, model_name, model_obj):
62+
self.model_name = model_name
63+
self.model_obj = model_obj
64+
65+
@aiohttp_apispec.docs(
66+
tags=["models"],
67+
summary="Return model's metadata",
68+
)
69+
@aiohttp_apispec.response_schema(responses.ModelMeta(), 200)
70+
async def get(self, request):
71+
m = {
72+
"id": self.model_name,
73+
"name": self.model_name,
74+
"links": [{
75+
"rel": "self",
76+
"href": "%s" % request.path,
77+
}]
78+
}
79+
meta = self.model_obj.get_metadata()
80+
m.update(meta)
81+
82+
return web.json_response(m)
83+
7284

73-
@aiohttp_apispec.docs(
74-
tags=["models"],
75-
summary="Return '%s' model metadata" % model_name,
76-
)
77-
@aiohttp_apispec.response_schema(responses.ModelMeta(), 200)
78-
async def get(self):
79-
m = {
80-
"id": self.model_name,
81-
"name": self.model_name,
82-
"links": [{
83-
"rel": "self",
84-
"href": "%s" % self.request.path,
85-
}]
86-
}
87-
meta = self.model_obj.get_metadata()
88-
m.update(meta)
85+
def setup_routes(app):
86+
app.router.add_get("/models", index)
8987

90-
return web.json_response(m)
88+
# In the next lines we iterate over the loaded models and create the
89+
# different resources for each model. This way we can also load the
90+
# expected parameters if needed (as in the training method).
91+
for model_name, model_obj in model.V2_MODELS.items():
92+
hdlr = Handler(model_name, model_obj)
93+
app.router.add_get("/models/%s" % model_name, hdlr.get)

deepaas/api/v2/predict.py

Lines changed: 35 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,6 @@
2222
from deepaas.api.v2 import responses
2323
from deepaas import model
2424

25-
# Get the models (this is a singleton, so it is safe to call it multiple times
26-
model.register_v2_models()
27-
28-
app = web.Application()
29-
routes = web.RouteTableDef()
30-
3125

3226
def _get_model_response(model_name, model_obj):
3327
response_schema = model_obj.response_schema
@@ -38,33 +32,38 @@ def _get_model_response(model_name, model_obj):
3832
return responses.Prediction
3933

4034

41-
# In the next lines we iterate over the loaded models and create different
42-
# views for the different models. We take the corresponding arguments and
43-
# responses from the underlying model, adding them to the OpenAPI spec and
44-
# documentation.
45-
for model_name, model_obj in model.V2_MODELS.items():
46-
47-
args = webargs.core.dict2schema(model_obj.get_predict_args())
48-
response = _get_model_response(model_name, model_obj)
49-
50-
@routes.view('/models/%s/predict' % model_name)
51-
class ModelPredict(web.View):
52-
model_name = model_name
53-
model_obj = model_obj
54-
55-
@aiohttp_apispec.docs(
56-
tags=["models"],
57-
summary="Make a prediction given the input data"
58-
)
59-
@aiohttp_apispec.querystring_schema(args)
60-
@aiohttp_apispec.response_schema(response(), 200)
61-
@aiohttp_apispec.response_schema(responses.Failure(), 400)
62-
@aiohttpparser.parser.use_args(args)
63-
async def post(self, args):
64-
ret = self.model_obj.predict(**args)
65-
66-
if self.model_obj.has_schema:
67-
self.model_obj.validate_response(ret)
68-
return web.json_response(ret)
69-
70-
return web.json_response({"status": "OK", "predictions": ret})
35+
def setup_routes(app):
36+
# In the next lines we iterate over the loaded models and create the
37+
# different resources for each model. This way we can also load the
38+
# expected parameters if needed (as in the training method).
39+
for model_name, model_obj in model.V2_MODELS.items():
40+
args = webargs.core.dict2schema(model_obj.get_predict_args())
41+
response = _get_model_response(model_name, model_obj)
42+
43+
class Handler(object):
44+
model_name = None
45+
model_obj = None
46+
47+
def __init__(self, model_name, model_obj):
48+
self.model_name = model_name
49+
self.model_obj = model_obj
50+
51+
@aiohttp_apispec.docs(
52+
tags=["models"],
53+
summary="Make a prediction given the input data"
54+
)
55+
@aiohttp_apispec.querystring_schema(args)
56+
@aiohttp_apispec.response_schema(response(), 200)
57+
@aiohttp_apispec.response_schema(responses.Failure(), 400)
58+
@aiohttpparser.parser.use_args(args)
59+
async def post(self, request, args):
60+
ret = await self.model_obj.predict(**args)
61+
62+
if self.model_obj.has_schema:
63+
self.model_obj.validate_response(ret)
64+
return web.json_response(ret)
65+
66+
return web.json_response({"status": "OK", "predictions": ret})
67+
68+
hdlr = Handler(model_name, model_obj)
69+
app.router.add_post("/models/%s/predict" % model_name, hdlr.post)

deepaas/api/v2/train.py

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -21,33 +21,33 @@
2121

2222
from deepaas import model
2323

24-
# Get the models (this is a singleton, so it is safe to call it multiple times
25-
model.register_v2_models()
26-
27-
app = web.Application()
28-
routes = web.RouteTableDef()
29-
30-
31-
# In the next lines we iterate over the loaded models and create different
32-
# views for the different models. We take the corresponding arguments and
33-
# responses from the underlying model, adding them to the OpenAPI spec and
34-
# documentation.
35-
for model_name, model_obj in model.V2_MODELS.items():
36-
args = webargs.core.dict2schema(model_obj.get_train_args())
37-
38-
@routes.view('/models/%s/train' % model_name)
39-
class ModelTrain(web.View):
40-
model_name = model_name
41-
model_obj = model_obj
42-
43-
@aiohttp_apispec.docs(
44-
tags=["models"],
45-
summary="Retrain model with available data"
46-
)
47-
@aiohttp_apispec.querystring_schema(args)
48-
@aiohttpparser.parser.use_args(args)
49-
async def put(self, args):
50-
ret = self.model_obj.train(**args)
51-
# FIXME(aloga): what are we returning here? We need to take care
52-
# of these responses as well.
53-
return web.json_response(ret)
24+
25+
def setup_routes(app):
26+
# In the next lines we iterate over the loaded models and create the
27+
# different resources for each model. This way we can also load the
28+
# expected parameters if needed (as in the training method).
29+
for model_name, model_obj in model.V2_MODELS.items():
30+
args = webargs.core.dict2schema(model_obj.get_train_args())
31+
32+
class Handler(object):
33+
model_name = None
34+
model_obj = None
35+
36+
def __init__(self, model_name, model_obj):
37+
self.model_name = model_name
38+
self.model_obj = model_obj
39+
40+
@aiohttp_apispec.docs(
41+
tags=["models"],
42+
summary="Retrain model with available data"
43+
)
44+
@aiohttp_apispec.querystring_schema(args)
45+
@aiohttpparser.parser.use_args(args)
46+
async def post(self, request, args):
47+
ret = await self.model_obj.train(**args)
48+
# FIXME(aloga): what are we returning here? We need to take
49+
# care of these responses as well.
50+
return web.json_response(ret)
51+
52+
hdlr = Handler(model_name, model_obj)
53+
app.router.add_post("/models/%s/train" % model_name, hdlr.post)

deepaas/config.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,14 @@
4141
print to the standard output and error (i.e. stdout and stderr) through the
4242
"/debug" endpoint. Default is to not provide this information. This will not
4343
provide logging information about the API itself.
44+
"""),
45+
cfg.IntOpt('model-workers',
46+
short='n',
47+
default=1,
48+
help="""
49+
Specify the number of workers *per model* that we will initialize. If using a
50+
CPU you probably want to increase this number, if using a GPU probably you want
51+
to leave it to 1. (defaults to 1)
4452
"""),
4553
]
4654

deepaas/model/v2/base.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,3 +230,15 @@ def get_train_args():
230230
application required arguments.
231231
"""
232232
raise NotImplementedError()
233+
234+
@abc.abstractmethod
235+
def warm(self):
236+
"""Warm (initialize, load) the model.
237+
238+
This is called when the model is loaded, before the API is spawned.
239+
240+
If implemented, it should prepare the model for execution. This is
241+
useful for loading it into memory, perform any kind of preliminary
242+
checks, etc.
243+
"""
244+
raise NotImplementedError()

deepaas/model/v2/test.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ class TestModel(base.BaseModel):
4545
),
4646
}
4747

48+
def warm(self):
49+
LOG.debug("Test model is warming...")
50+
4851
def predict(self, **kwargs):
4952
LOG.debug("Got the following kw arguments: %s", kwargs)
5053
return {

0 commit comments

Comments
 (0)