3
3
render_fast_api_tsrct_backend )
4
4
from piper .base .docker import PythonImage
5
5
from piper .configurations import get_configuration
6
- from piper .envs import get_env , is_current_env , is_docker_env
6
+ from piper .envs import get_env , is_current_env , is_docker_env , Env
7
7
from piper .utils import docker_utils
8
+ from piper .utils .logger_utils import logger
9
+ from piper .base .executors import HTTPExecutor
8
10
11
+ import asyncio
9
12
import inspect
10
13
import sys
11
14
import time
18
21
import requests
19
22
from pydantic import BaseModel # , BytesObject, ListOfStringsObject
20
23
21
- from piper .utils .logger_utils import logger
22
-
23
-
24
- class BaseExecutor :
25
- pass
26
-
27
-
28
- class LocalExecutor :
29
- pass
30
-
31
-
32
- def is_known (obj ):
33
- basic = obj .__class__ .__name__ in {'dict' , 'list' , 'tuple' , 'str' , 'int' , 'float' , 'bool' }
34
- models = isinstance (obj , (BaseModel ,))
35
- return basic or models
36
-
37
-
38
- def prepare (obj ):
39
- if isinstance (obj , (BaseModel ,)):
40
- return obj .dict ()
41
- return obj
42
-
43
-
44
- def inputs_to_dict (* args , ** kwargs ):
45
- from_args = {}
46
- for arg in args :
47
- if is_known (arg ):
48
- from_args .update (prepare (arg ))
49
- from_kwargs = {k : prepare (v ) for k , v in kwargs .items () if is_known (v )}
50
- from_args .update (from_kwargs )
51
- return from_args
52
-
53
24
54
25
def add_packages_to_install (packages_list ):
55
26
row = f'RUN apt install -y { " " .join (packages_list )} \n '
@@ -60,32 +31,6 @@ def add_row(row):
60
31
return f'{ row } \n '
61
32
62
33
63
- class HTTPExecutor (BaseExecutor ):
64
-
65
- def __init__ (self , host : str , port : int , base_handler : str ):
66
- self .host = host
67
- self .port = port
68
-
69
- @abstractmethod
70
- async def run (self , * args , ** kwargs ):
71
- pass
72
-
73
- async def __call__ (self , * args , ** kwargs ):
74
- logger .info (f'get_env() { get_env ()} ' )
75
- logger .info (f'is_current_env() { is_current_env ()} ' )
76
- if is_current_env ():
77
- return await self .run (* args , ** kwargs )
78
- else :
79
- function = "run"
80
- request_dict = inputs_to_dict (* args , ** kwargs )
81
- logger .info (f'request_dict is { request_dict } ' )
82
- async with aiohttp .ClientSession () as session :
83
- url = f'http://{ self .host } :{ self .port } /{ function } '
84
- logger .info (f'run function with url { url } and data { request_dict } ' )
85
- async with session .post (url , json = request_dict ) as resp :
86
- return await resp .json ()
87
-
88
-
89
34
def copy_piper (path : str ):
90
35
cfg = get_configuration ()
91
36
copy_tree (cfg .piper_path , f"{ path } piper" )
@@ -162,7 +107,8 @@ class FastAPIExecutor(HTTPExecutor):
162
107
def __init__ (self , port : int = 8080 , ** service_kwargs ):
163
108
self .container = None
164
109
self .image_tag = 'piper:latest'
165
- self .container_name = "piper_FastAPI"
110
+ self .id = hash (self )
111
+ self .container_name = f"piper_FastAPI_{ self .id } "
166
112
167
113
if is_docker_env ():
168
114
docker_client = docker .DockerClient (base_url = 'unix://var/run/docker.sock' )
@@ -194,6 +140,14 @@ def __init__(self, port: int = 8080, **service_kwargs):
194
140
195
141
super ().__init__ ('localhost' , port , self .base_handler )
196
142
143
+ async def aio_call (self , * args , ** kwargs ):
144
+ return await super ().__call__ (* args , ** kwargs )
145
+
146
+ def __call__ (self , * args , ** kwargs ):
147
+ loop = asyncio .get_event_loop ()
148
+ result = loop .run_until_complete (self .aio_call (* args , ** kwargs ))
149
+ return result
150
+
197
151
def rm_container (self ):
198
152
if self .container :
199
153
self .container .remove (force = True )
0 commit comments