Skip to content

Commit 55d93f0

Browse files
authored
Feature/ml streaming base (#115)
* basic definition of `Communicator` interface * basic definition of `Compressor` interface * basic definition of `Encryptor` interface * `Compressor`, `Encryptor` and `Communicator` interfaces * `PymiloClient` basic init * refactor list deserialization * add "\n" between functions and class declaration * update interfaces, make Communicator interface stateful * implement `RESTClientCommunicator` * implement `RESTServerCommunicator` * remove `DummyCommunicator` * implement PyMilo REST Client * implement PyMilo REST Server * add `streaming` folder to pymilo package installations * add script to run a test pymilo REST server * implement a cohesive scenario1 as a test for ml streaming feature * implement a cohesive scenario2 as a test for ml streaming feature * add fastapi and uvicorn to requirments * add requests to requirments * update threshold to be able to have uvicorn in python 3.6 * add timeout to post & get calls * `CHANGELOG.md` updated * add `__init__.py` to `streaming` module * split pymilo installation into 2 different modes, `basic` mode not having ml streaming features and `streaming` mode as a full version * update docstring in `setup.py` * add `__doc__` to attribute return + some refactorings applied * rename `Communicator` interface to `ClientCommunicator` * install the full version of pymilo in workflow * remove trailing whitespaces + unused lines * replace type check with type() with isinstance() * remove whitespaces * fix 500 error for /docs and /redoc * make `ml streaming` testcases `deployable` next to other pytest testcases * downgrade uvicorn + fastapi versions to fix python3.7 issue * fix python3.7 and python3.6 issue in windows * remove body modifier Middleware * add `parser` instead of body modifier Middleware * add session + Retry policy in `RESTClientCommunicator` * replace `python` with `executable` * remove unnecessary `pass` from @AbstractMethod * rewrite requirements installation * enhance the cohesiveness of the `execute_model` function output * add dictionary mode to handle different installations * replace return "Error" with raising exception * update `toggle_mode` syntax
1 parent 1a75629 commit 55d93f0

17 files changed

+445
-23
lines changed

.github/workflows/test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ jobs:
2626
- name: Installation
2727
run: |
2828
python -m pip install --upgrade pip
29-
pip install .
29+
pip install .["streaming"]
3030
- name: Test requirements Installation
3131
run: |
3232
python otherfiles/requirements-splitter.py

CHANGELOG.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,19 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
66

77
## [Unreleased]
88
### Added
9+
- `PymiloServer` class in `streaming.pymilo_server.py`
10+
- `PymiloClient` class in `streaming.pymilo_client.py`
11+
- `Communicator` interface in `streaming.interfaces.py`
12+
- `RESTClientCommunicator` class in `streaming.communicator.py`
13+
- `RESTServerCommunicator` class in `streaming.communicator.py`
14+
- `Compressor` interface in `streaming.interfaces.py`
15+
- `DummyCompressor` class in `streaming.compressor.py`
16+
- `Encryptor` interface in `streaming.interfaces.py`
17+
- `DummyEncryptor` class in `streaming.encryptor.py`
18+
- `ML Streaming` RESTful testcases
919
### Changed
20+
- `serialize` function in `GeneralDataStructureTransporter` Transporter refactored
21+
- `get_deserialized_list` function in `GeneralDataStructureTransporter` Transporter refactored
1022
- `Export` class call by reference bug fixed
1123
## [0.9] - 2024-07-01
1224
### Added

dev-requirements.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
numpy==2.0.1
22
scikit-learn==1.5.1
33
scipy>=0.19.1
4-
requests>=2.0.0
4+
uvicorn==0.14.0
5+
fastapi==0.68.0
6+
requests==2.0.0
57
setuptools>=40.8.0
68
vulture>=1.0
79
bandit>=1.5.1

pymilo/streaming/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# -*- coding: utf-8 -*-
2+
"""PyMilo ML Streaming."""

pymilo/streaming/communicator.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
import uvicorn
2+
import requests
3+
from fastapi import FastAPI, Request
4+
from pydantic import BaseModel
5+
from .interfaces import ClientCommunicator
6+
7+
8+
class RESTClientCommunicator(ClientCommunicator):
9+
10+
def __init__(self, server_url):
11+
self._server_url = server_url
12+
self.session = requests.Session()
13+
retries = requests.adapters.Retry(
14+
total=5,
15+
backoff_factor=0.1,
16+
status_forcelist=[500, 502, 503, 504]
17+
)
18+
self.session.mount('http://', requests.adapters.HTTPAdapter(max_retries=retries))
19+
self.session.mount('https://', requests.adapters.HTTPAdapter(max_retries=retries))
20+
21+
def download(self, payload):
22+
return self.session.get(url=self._server_url + "/download/", json=payload, timeout=5)
23+
24+
def upload(self, payload):
25+
return self.session.post(url=self._server_url + "/upload/", json=payload, timeout=5)
26+
27+
def attribute_call(self, payload):
28+
return self.session.post(url=self._server_url + "/attribute_call/", json=payload, timeout=5)
29+
30+
31+
32+
class RESTServerCommunicator():
33+
34+
def __init__(
35+
self,
36+
ps,
37+
host: str = "127.0.0.1",
38+
port: int = 8000,
39+
):
40+
self.app = FastAPI()
41+
self.host = host
42+
self.port = port
43+
self._ps = ps
44+
self.setup_routes()
45+
46+
def setup_routes(self):
47+
class StandardPayload(BaseModel):
48+
client_id: str
49+
model_id: str
50+
class DownloadPayload(StandardPayload):
51+
pass
52+
class UploadPayload(StandardPayload):
53+
model: str
54+
class AttributePayload(StandardPayload):
55+
attribute: str
56+
args: list
57+
kwargs: dict
58+
59+
@self.app.get("/download/")
60+
async def download(request: Request):
61+
body = await request.json()
62+
body = self.parse(body)
63+
payload = DownloadPayload(**body)
64+
message = "/download request from client: {} for model: {}".format(payload.client_id, payload.model_id)
65+
return {
66+
"message": message,
67+
"payload": self._ps.export_model(),
68+
}
69+
70+
@self.app.post("/upload/")
71+
async def upload(request: Request):
72+
body = await request.json()
73+
body = self.parse(body)
74+
payload = UploadPayload(**body)
75+
message = "/upload request from client: {} for model: {}".format(payload.client_id, payload.model_id)
76+
return {
77+
"message": message,
78+
"payload": self._ps.update_model(payload.model)
79+
}
80+
81+
@self.app.post("/attribute_call/")
82+
async def attribute_call(request: Request):
83+
body = await request.json()
84+
body = self.parse(body)
85+
payload = AttributePayload(**body)
86+
message = "/attribute_call request from client: {} for model: {}".format(payload.client_id, payload.model_id)
87+
result = self._ps.execute_model(payload)
88+
return {
89+
"message": message,
90+
"payload": result if result is not None else "The ML model has been updated in place."
91+
}
92+
93+
def parse(self, body):
94+
return self._ps._compressor.extract(
95+
self._ps._encryptor.decrypt(
96+
body
97+
)
98+
)
99+
100+
def run(self):
101+
uvicorn.run(self.app, host=self.host, port=self.port)

pymilo/streaming/compressor.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from .interfaces import Compressor
2+
3+
4+
class DummyCompressor(Compressor):
5+
6+
@staticmethod
7+
def compress(payload):
8+
return payload
9+
10+
@staticmethod
11+
def extract(payload):
12+
return payload

pymilo/streaming/encryptor.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from .interfaces import Encryptor
2+
3+
4+
class DummyEncryptor(Encryptor):
5+
6+
@staticmethod
7+
def encrypt(payload):
8+
return payload
9+
10+
@staticmethod
11+
def decrypt(payload):
12+
return payload

pymilo/streaming/interfaces.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# -*- coding: utf-8 -*-
2+
"""PyMilo ML Streaming Interfaces."""
3+
from abc import ABC, abstractmethod
4+
5+
6+
class Compressor(ABC):
7+
8+
@abstractmethod
9+
def compress(payload):
10+
"""
11+
"""
12+
13+
@abstractmethod
14+
def extract(payload):
15+
"""
16+
"""
17+
18+
19+
class Encryptor(ABC):
20+
21+
@abstractmethod
22+
def encrypt(payload):
23+
"""
24+
"""
25+
26+
@abstractmethod
27+
def decrypt(payload):
28+
"""
29+
"""
30+
31+
32+
class ClientCommunicator(ABC):
33+
34+
@abstractmethod
35+
def upload(self, payload):
36+
"""
37+
"""
38+
39+
@abstractmethod
40+
def download(self, payload):
41+
"""
42+
"""
43+
44+
@abstractmethod
45+
def attribute_call(self, payload):
46+
"""
47+
"""

pymilo/streaming/pymilo_client.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
from .encryptor import DummyEncryptor
2+
from .compressor import DummyCompressor
3+
from ..pymilo_obj import Export, Import
4+
from .communicator import RESTClientCommunicator
5+
from ..transporters.general_data_structure_transporter import GeneralDataStructureTransporter
6+
class PymiloClient:
7+
8+
def __init__(
9+
self,
10+
model=None,
11+
mode="LOCAL",
12+
server="http://127.0.0.1",
13+
port= 8000
14+
):
15+
self._client_id = "0x_client_id"
16+
self._model_id = "0x_model_id"
17+
self._model = model
18+
self._mode = mode
19+
self._compressor = DummyCompressor()
20+
self._encryptor = DummyEncryptor()
21+
self._communicator = RESTClientCommunicator(
22+
server_url="{}:{}".format(server, port)
23+
)
24+
25+
def toggle_mode(self, mode="LOCAL"):
26+
mode = mode.upper()
27+
if mode not in ["LOCAL", "DELEGATE"]:
28+
raise Exception("Invalid mode, the given mode should be either `LOCAL`[default] or `DELEGATE`.")
29+
self._mode = mode
30+
31+
def download(self):
32+
response = self._communicator.download({
33+
"client_id": self._client_id,
34+
"model_id": self._model_id
35+
})
36+
if response.status_code != 200:
37+
print("Remote model download failed.")
38+
print("Remote model downloaded successfully.")
39+
serialized_model = response.json()["payload"]
40+
self._model = Import(file_adr=None, json_dump=serialized_model).to_model()
41+
print("Local model updated successfully.")
42+
43+
def upload(self):
44+
response = self._communicator.upload({
45+
"client_id": self._client_id,
46+
"model_id": self._model_id,
47+
"model": Export(self._model).to_json(),
48+
})
49+
if response.status_code == 200:
50+
print("Local model uploaded successfully.")
51+
else:
52+
print("Local model upload failed.")
53+
54+
def __getattr__(self, attribute):
55+
if self._mode == "LOCAL":
56+
if attribute in dir(self._model):
57+
return getattr(self._model, attribute)
58+
else:
59+
raise AttributeError("This attribute doesn't exist either in PymiloClient or the inner ML model.")
60+
elif self._mode == "DELEGATE":
61+
gdst = GeneralDataStructureTransporter()
62+
def relayer(*args, **kwargs):
63+
print(f"Method '{attribute}' called with args: {args} and kwargs: {kwargs}")
64+
payload = {
65+
"client_id": self._client_id,
66+
"model_id": self._model_id,
67+
'attribute': attribute,
68+
'args': args,
69+
'kwargs': kwargs,
70+
}
71+
payload["args"] = gdst.serialize(payload, "args", None)
72+
payload["kwargs"] = gdst.serialize(payload, "kwargs", None)
73+
result = self._communicator.attribute_call(
74+
self._encryptor.encrypt(
75+
self._compressor.compress(
76+
payload
77+
)
78+
)
79+
).json()
80+
return gdst.deserialize(result, "payload", None)
81+
relayer.__doc__ = getattr(self._model.__class__, attribute).__doc__
82+
return relayer

pymilo/streaming/pymilo_server.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from ..pymilo_obj import Export, Import
2+
from .compressor import DummyCompressor
3+
from .encryptor import DummyEncryptor
4+
from .communicator import RESTServerCommunicator
5+
from ..transporters.general_data_structure_transporter import GeneralDataStructureTransporter
6+
7+
8+
class PymiloServer:
9+
10+
def __init__(self):
11+
self._model = None
12+
self._compressor = DummyCompressor()
13+
self._encryptor = DummyEncryptor()
14+
self._communicator = RESTServerCommunicator(ps=self)
15+
self._communicator.run()
16+
17+
def export_model(self):
18+
return Export(self._model).to_json()
19+
20+
def update_model(self, serialized_model):
21+
self._model = Import(file_adr=None, json_dump=serialized_model).to_model()
22+
23+
def execute_model(self, request):
24+
gdst = GeneralDataStructureTransporter()
25+
attribute = request.attribute
26+
retrieved_attribute = getattr(self._model, attribute, None)
27+
if retrieved_attribute is None:
28+
raise Exception("The requested attribute doesn't exist in this model.")
29+
arguments = {
30+
'args': request.args,
31+
'kwargs': request.kwargs
32+
}
33+
args = gdst.deserialize(arguments, 'args', None)
34+
kwargs = gdst.deserialize(arguments, 'kwargs', None)
35+
output = retrieved_attribute(*args, **kwargs)
36+
if isinstance(output, type(self._model)):
37+
self._model = output
38+
return None
39+
return gdst.serialize({'output': output}, 'output', None)

0 commit comments

Comments
 (0)