diff --git a/lithops/constants.py b/lithops/constants.py index 84303102..b6e07b32 100644 --- a/lithops/constants.py +++ b/lithops/constants.py @@ -111,7 +111,8 @@ 'aliyun_fc', 'oracle_f', 'k8s', - 'singularity' + 'singularity', + 'one' ] STANDALONE_BACKENDS = [ diff --git a/lithops/serverless/backends/one/__init__.py b/lithops/serverless/backends/one/__init__.py new file mode 100644 index 00000000..4b4e2274 --- /dev/null +++ b/lithops/serverless/backends/one/__init__.py @@ -0,0 +1,3 @@ +from .one import OpenNebulaBackend as ServerlessBackend + +__all__ = ['ServerlessBackend'] \ No newline at end of file diff --git a/lithops/serverless/backends/one/config.py b/lithops/serverless/backends/one/config.py new file mode 100644 index 00000000..a952baab --- /dev/null +++ b/lithops/serverless/backends/one/config.py @@ -0,0 +1,42 @@ +# +# (C) Copyright Cloudlab URV 2024 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import enum + + +@enum.unique +class ServiceState(enum.Enum): + RUNNING = 2 + SCALING = 9 + COOLDOWN = 10 + + +DEFAULT_CONFIG_KEYS = { + 'runtime_timeout': 600, # Default: 10 minutes + 'runtime_memory': 512, # Default memory: 512 MB + 'max_workers': 100, + 'worker_processes': 1, +} + + +def load_config(config_data): + for key in DEFAULT_CONFIG_KEYS: + if key not in config_data['one']: + config_data['one'][key] = DEFAULT_CONFIG_KEYS[key] + + if 'rabbitmq' not in config_data: + raise Exception('RabbitMQ configuration is needed in this backend') + else: + config_data['one']['amqp_url'] = config_data['rabbitmq'].get('amqp_url', False) diff --git a/lithops/serverless/backends/one/entry_point.py b/lithops/serverless/backends/one/entry_point.py new file mode 100644 index 00000000..95aea8b5 --- /dev/null +++ b/lithops/serverless/backends/one/entry_point.py @@ -0,0 +1,144 @@ +# +# (C) Copyright Cloudlab URV 2024 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys +sys.path.insert(0, '/lithops') + +import pika +import os +import time +import uuid +import json +import logging +from multiprocessing import Value, cpu_count +from threading import Thread + +from lithops.version import __version__ +from lithops.utils import setup_lithops_logger, b64str_to_dict, dict_to_b64str +from lithops.worker import function_handler +from lithops.worker.utils import get_runtime_metadata +from lithops.constants import JOBS_PREFIX +from lithops.storage.storage import InternalStorage + +logger = logging.getLogger('lithops.worker') + + +def extract_runtime_meta(payload): + logger.info(f"Lithops v{__version__} - Generating metadata") + + runtime_meta = get_runtime_metadata() + + internal_storage = InternalStorage(payload) + status_key = '/'.join([JOBS_PREFIX, payload['runtime_name'] + '.meta']) + dmpd_response_status = json.dumps(runtime_meta) + internal_storage.put_data(status_key, dmpd_response_status) + logger.info(f"Runtime metadata key {status_key}") + + +def run_job_k8s_rabbitmq(payload): + logger.info(f"Lithops v{__version__} - Starting OpenNebula worker execution") + + act_id = str(uuid.uuid4()).replace('-', '')[:12] + os.environ['__LITHOPS_ACTIVATION_ID'] = act_id + os.environ['__LITHOPS_BACKEND'] = 'one' + + function_handler(payload) + with running_jobs.get_lock(): + running_jobs.value += len(payload['call_ids']) + + logger.info("Finishing OpenNebula worker execution") + + +def manage_work_queue(ch, method, payload): + """Callback to receive the payload and run the jobs""" + logger.info("Call from lithops received.") + + message = payload + tasks = message['total_calls'] + + # If there are more tasks than cpus in the pod, we need to send a new message + if tasks <= running_jobs.value: + processes_to_start = tasks + else: + if running_jobs.value == 0: + logger.info("All cpus are busy. Waiting for a cpu to be free") + ch.basic_nack(delivery_tag=method.delivery_tag) + time.sleep(0.5) + return + + processes_to_start = running_jobs.value + + message_to_send = message.copy() + message_to_send['total_calls'] = tasks - running_jobs.value + message_to_send['call_ids'] = message_to_send['call_ids'][running_jobs.value:] + message_to_send['data_byte_ranges'] = message_to_send['data_byte_ranges'][running_jobs.value:] + message_to_send = {'action': 'send_task', 'payload': dict_to_b64str(message_to_send)} + message['call_ids'] = message['call_ids'][:running_jobs.value] + message['data_byte_ranges'] = message['data_byte_ranges'][:running_jobs.value] + + ch.basic_publish( + exchange='', + routing_key='task_queue', + body=json.dumps(message_to_send), + properties=pika.BasicProperties( + delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE + )) + + logger.info(f"Starting {processes_to_start} processes") + + message['worker_processes'] = running_jobs.value + with running_jobs.get_lock(): + running_jobs.value -= processes_to_start + + Thread(target=run_job_k8s_rabbitmq, args=([message])).start() + + ch.basic_ack(delivery_tag=method.delivery_tag) + + +def actions_switcher(ch, method, properties, body): + message = json.loads(body) + action = message['action'] + encoded_payload = message['payload'] + + payload = b64str_to_dict(encoded_payload) + setup_lithops_logger(payload.get('log_level', 'INFO')) + + logger.info(f"Action {action} received from lithops.") + + if action == 'get_metadata': + extract_runtime_meta(payload) + ch.basic_ack(delivery_tag=method.delivery_tag) + + elif action == 'send_task': + manage_work_queue(ch, method, payload) + + +if __name__ == '__main__': + # Shared variable to track completed jobs + running_jobs = Value('i', cpu_count()) + + # Connect to rabbitmq + params = pika.URLParameters(sys.argv[1]) + connection = pika.BlockingConnection(params) + channel = connection.channel() + channel.queue_declare(queue='task_queue', durable=True) + channel.basic_qos(prefetch_count=1) + + # Start listening to the new job + channel.basic_consume(queue='task_queue', on_message_callback=actions_switcher) + + logger.info("Listening to rabbitmq...") + channel.start_consuming() diff --git a/lithops/serverless/backends/one/gate.py b/lithops/serverless/backends/one/gate.py new file mode 100644 index 00000000..c0b85995 --- /dev/null +++ b/lithops/serverless/backends/one/gate.py @@ -0,0 +1,68 @@ +import os + +import requests + + +class OneGateError(Exception): + """General exception for OneGate-related errors.""" + + def __init__(self, message, status_code=None): + super().__init__(message) + self.status_code = status_code + + +class OneGateClient: + def __init__(self): + self.endpoint = os.getenv( + "ONEGATE_ENDPOINT", self.get_config("ONEGATE_ENDPOINT") + ) + self.token = self.read_file("/mnt/context/token.txt") + self.vm_id = self.get_config("VMID") + + @staticmethod + def read_file(filepath): + with open(filepath, "r") as file: + return file.read().strip() + + @staticmethod + def get_config(param, filepath="/mnt/context/context.sh"): + with open(filepath, "r") as file: + for line in file: + if line.startswith(f"{param}="): + return line.split("=", 1)[1].strip().strip("'\"") + return None + + def get(self, path): + """ + Make a GET request to OneGate API and return the JSON response. + """ + url = f"{self.endpoint}/{path}" + headers = {"X-ONEGATE-TOKEN": self.token, "X-ONEGATE-VMID": self.vm_id} + + try: + response = requests.get(url, headers=headers) + response.raise_for_status() + return response.json() + except requests.exceptions.RequestException as e: + status_code = e.response.status_code if e.response else None + raise OneGateError(f"GET request to {url} failed: {e}", status_code) + except ValueError as e: + raise OneGateError(f"Failed to parse JSON response: {e}") + + def scale(self, cardinality, role="worker"): + """ + Make a PUT request to OneGate API. + """ + url = f"{self.endpoint}/service/role/{role}" + headers = { + "X-ONEGATE-TOKEN": self.token, + "X-ONEGATE-VMID": self.vm_id, + "Content-Type": "application/json", + } + data = {"cardinality": cardinality} + try: + response = requests.put(url, headers=headers, json=data) + response.raise_for_status() + except requests.exceptions.RequestException as e: + status_code = e.response.status_code if e.response else None + raise OneGateError(f"PUT request to {url} failed: {e}", status_code) diff --git a/lithops/serverless/backends/one/lithops_client.sh b/lithops/serverless/backends/one/lithops_client.sh new file mode 100644 index 00000000..49da6159 --- /dev/null +++ b/lithops/serverless/backends/one/lithops_client.sh @@ -0,0 +1,70 @@ +#!/bin/bash + +install_python3_venv() { + echo "=== Installing python3-venv ===" + apt-get update && apt-get install -y python3-venv +} + +mount_onegate() { + mkdir /mnt/context + mount /dev/cdrom /mnt/context +} + +clone_lithops_repository() { + echo "=== Cloning the Lithops repository into /lithops ===" + git clone https://github.com/OpenNebula/lithops.git /lithops + git checkout f-748 +} + +create_virtualenv() { + echo "=== Creating a Python virtual environment in /lithops-venv ===" + python3 -m venv /lithops-venv + source /lithops-venv/bin/activate +} + +install_lithops() { + echo "=== Installing Lithops and dependencies ===" + cd /lithops + python3 setup.py install + pip install lithops[aws] + pip install --no-deps -e . +} + +setup_configuration() { + echo "=== Setting up Lithops configuration ===" + mkdir -p /etc/lithops + cat < /etc/lithops/config +lithops: + monitoring: rabbitmq + backend: one + storage: aws_s3 + +rabbitmq: + amqp_url: EDIT_ME + +one: + worker_processes: 2 + runtime_memory: 512 + runtime_timeout: 600 + runtime_cpu: 2 + amqp_url: EDIT_ME + max_workers: 3 + min_workers: 1 + autoscale: none + +aws: + region: EDIT_ME + access_key_id: EDIT_ME + secret_access_key: EDIT_ME +EOF +} + +# Main +mount_onegate +install_python3_venv +clone_lithops_repository +create_virtualenv +install_lithops +setup_configuration + +echo "=== Lithops setup and configuration completed successfully ===" \ No newline at end of file diff --git a/lithops/serverless/backends/one/lithops_serivce.json b/lithops/serverless/backends/one/lithops_serivce.json new file mode 100644 index 00000000..7e1fc3f1 --- /dev/null +++ b/lithops/serverless/backends/one/lithops_serivce.json @@ -0,0 +1,33 @@ +{ + "name": "Custom_Lithops_Service", + "deployment": "straight", + "description": "", + "roles": [ + { + "name": "lithops_client", + "cardinality": 1, + "vm_template": EDIT_ME, + "vm_template_contents": "NIC = [\n NAME = \"_NIC0\",\n NETWORK_ID = \"$Public\" ]\n ", + "min_vms": 1, + "max_vms": 1, + "elasticity_policies": [], + "scheduled_policies": [], + "cooldown": 5 + }, + { + "name": "lithops_worker", + "cardinality": 1, + "vm_template": EDIT_ME, + "vm_template_contents": "NIC = [\n NAME = \"_NIC0\",\n NETWORK_ID = \"$Public\" ]\n ", + "elasticity_policies": [], + "scheduled_policies": [], + "cooldown": 5 + } + ], + "ready_status_gate": false, + "automatic_deletion": false, + "networks": { + "Public": "M|network|Public| |id:" + }, + "registration_time": 1742379547 + } \ No newline at end of file diff --git a/lithops/serverless/backends/one/lithops_worker.sh b/lithops/serverless/backends/one/lithops_worker.sh new file mode 100644 index 00000000..69174cce --- /dev/null +++ b/lithops/serverless/backends/one/lithops_worker.sh @@ -0,0 +1,91 @@ +#!/bin/bash + +export AMQP_URL='EDIT_ME' + +install_system_dependencies() { + echo "=== Updating apt repositories and installing system dependencies ===" + apt-get update && apt-get install -y \ + build-essential \ + python3-dev \ + python3-pip \ + python3-venv \ + git \ + zip && \ + rm -rf /var/lib/apt/lists/* +} + +mount_onegate() { + mkdir /mnt/context + mount /dev/cdrom /mnt/context +} + +clone_lithops_repository() { + echo "=== Cloning the Lithops repository into /lithops ===" + git clone https://github.com/OpenNebula/lithops.git /lithops + git checkout f-748 +} + +create_virtualenv() { + echo "=== Creating a Python virtual environment in /lithops-venv ===" + python3 -m venv /lithops-venv +} + +install_python_dependencies() { + echo "=== Upgrading pip, setuptools, and six; Installing Python dependencies ===" + source /lithops-venv/bin/activate + pip install --upgrade setuptools six pip + pip install --no-cache-dir \ + boto3 \ + pika \ + flask \ + gevent \ + redis \ + requests \ + PyYAML \ + numpy \ + cloudpickle \ + ps-mem \ + tblib \ + psutil +} + +setup_service() { + echo "=== Copying entry_point.py to / ===" + cp /lithops/lithops/serverless/backends/one/entry_point.py /entry_point.py + + echo "=== Creating systemd service file ===" + cat < /etc/systemd/system/lithops.service +[Unit] +Description=Lithops Entry Point +After=network.target + +[Service] +User=root +Group=root +WorkingDirectory=/ +Environment="AMQP_URL=EDIT_ME" +ExecStart=/lithops-venv/bin/python /entry_point.py \$AMQP_URL +Restart=always +RestartSec=5 +StandardOutput=append:/var/log/lithops.log +StandardError=append:/var/log/lithops.log + +[Install] +WantedBy=multi-user.target +EOF + + echo "=== Reloading systemd, enabling, and starting the service ===" + systemctl daemon-reload + systemctl enable lithops + systemctl start lithops +} + +# Main +mount_onegate +install_system_dependencies +clone_lithops_repository +create_virtualenv +install_python_dependencies +setup_service + +echo "=== Lithops setup and service installation completed successfully ===" diff --git a/lithops/serverless/backends/one/one.py b/lithops/serverless/backends/one/one.py new file mode 100644 index 00000000..42e8c0c9 --- /dev/null +++ b/lithops/serverless/backends/one/one.py @@ -0,0 +1,263 @@ +# +# (C) Copyright Cloudlab URV 2024 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from .gate import OneGateClient +from .config import ServiceState + +import os +import pika +import hashlib +import json +import logging +import copy +import time + +from lithops import utils +from lithops.version import __version__ +from lithops.constants import COMPUTE_CLI_MSG, JOBS_PREFIX + +logger = logging.getLogger(__name__) + + +class OpenNebulaBackend: + """ + A wrap-up around OpenNebula backend. + """ + + def __init__(self, one_config, internal_storage): + logger.info("Initializing OpenNebula backend") + + # Backend configuration + self.name = 'one' + self.type = utils.BackendType.BATCH.value + self.one_config = one_config + self.internal_storage = internal_storage + self.amqp_url = self.one_config['amqp_url'] + + # Init rabbitmq + params = pika.URLParameters(self.amqp_url) + self.connection = pika.BlockingConnection(params) + self.channel = self.connection.channel() + self.channel.queue_declare(queue='task_queue', durable=True) + + # OpenNebula configuration + logger.debug("Initializing OneGate python client") + self.client = OneGateClient() + + msg = COMPUTE_CLI_MSG.format('OpenNebula') + logger.info(f"{msg}") + + def _format_job_name(self, runtime_name, runtime_memory, version=__version__): + name = f'{runtime_name}-{runtime_memory}-{version}' + name_hash = hashlib.sha1(name.encode()).hexdigest()[:10] + + return f'lithops-worker-{version.replace(".", "")}-{name_hash}' + + def build_runtime(self, one_image_name, one_file, extra_args=[]): + """ + Builds a new runtime from a requirements.txt file and pushes it to an OpenNebula VM template + """ + # TODO: + # 1. Get requirement.txt file from runtime['requirements_file'] + # 2. Read the dependencies and instert them to the VM template + # 2.1. Get the VM Template basic from default configuration + # 2.2. Append dependencies in basic VM Template + # 3. Using OneGate, update the VM Template + # 4. Deploy at least 1 VM to get runtime_meta + # 5. Wait for it to be ready + pass + + def _create_default_runtime(self): + """ + Builds the default runtime + """ + pass + + def deploy_runtime(self, one_image_name, memory, timeout): + """ + Deploys a new runtime + """ + logger.info(f"Deploying fake runtime: {one_image_name}") + runtime_meta = self._generate_runtime_meta(one_image_name) + + return runtime_meta + + def delete_runtime(self, one_image_name, memory, version): + """ + Deletes a runtime + """ + pass + + def clear(self, job_keys=None): + min_nodes = int(self.one_config['min_workers']) + current_nodes = self._get_nodes() + + if current_nodes > min_nodes and self.one_config["autoscale"] in {"all", "down"}: + self._scale_one(current_nodes, min_nodes) + + def clean(self, all=False): + """ + Deletes all jobs + """ + min_nodes = int(self.one_config['min_workers']) + current_nodes = self._get_nodes() + + if current_nodes > min_nodes: + self._scale_one(current_nodes, min_nodes) + + logger.debug('Cleaning RabbitMQ queues') + self.channel.queue_delete(queue='task_queue') + + def list_runtimes(self, one_image_name='all'): + """ + List all the runtimes + return: list of tuples (one_image_name, memory) + """ + pass + + def invoke(self, one_image_name, runtime_memory, job_payload): + """ + Invoke -- return information about this invocation + For array jobs only remote_invocator is allowed + """ + job_key = job_payload['job_key'] + granularity = self.one_config['worker_processes'] + functions = job_payload['total_calls'] + + current_workers = self._get_nodes() * granularity + max_workers = int(self.one_config['max_workers']) * granularity + + if current_workers < functions and current_workers < max_workers and self.one_config["autoscale"] in {"all", "up"}: + self._scale_one(current_workers // granularity, max_workers // granularity) + current_workers = max_workers + + granularity = current_workers + times, res = divmod(functions, granularity) + + for i in range(times + (1 if res != 0 else 0)): + num_tasks = granularity if i < times else res + payload_edited = job_payload.copy() + + start_index = i * granularity + end_index = start_index + num_tasks + + payload_edited['call_ids'] = payload_edited['call_ids'][start_index:end_index] + payload_edited['data_byte_ranges'] = payload_edited['data_byte_ranges'][start_index:end_index] + payload_edited['total_calls'] = num_tasks + + message = { + 'action': 'send_task', + 'payload': utils.dict_to_b64str(payload_edited) + } + + self.channel.basic_publish( + exchange='', + routing_key='task_queue', + body=json.dumps(message), + properties=pika.BasicProperties( + delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE + )) + + activation_id = f'lithops-{job_key.lower()}' + + return activation_id + + def _get_nodes(self) -> int: + for role in self.client.get("service").get("SERVICE", {}).get("roles", []): + if "lithops_worker" in role.get("name", "").lower(): + return int(role.get("cardinality")) + + return 0 + + def _scale_one(self, nodes: int, scale_nodes: int) -> None: + service = self.client.get("service").get("SERVICE", {}) + if service.get("state") != ServiceState.RUNNING.value: + logger.info( + "Service is not in 'RUNNING' state and can not be scaled" + ) + return + logger.info(f"Scaling workers from {nodes} to {scale_nodes} nodes") + self.client.scale(scale_nodes, "lithops_worker") + + def _generate_runtime_meta(self, one_image_name): + runtime_name = self._format_job_name(one_image_name, 128) + + logger.info(f"Extracting metadata from: {one_image_name}") + + payload = copy.deepcopy(self.internal_storage.storage.config) + payload['runtime_name'] = runtime_name + payload['log_level'] = logger.getEffectiveLevel() + encoded_payload = utils.dict_to_b64str(payload) + + message = { + 'action': 'get_metadata', + 'payload': encoded_payload + } + + # Send message to RabbitMQ + self.channel.basic_publish( + exchange='', + routing_key='task_queue', + body=json.dumps(message), + properties=pika.BasicProperties( + delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE + )) + + logger.debug("Waiting for runtime metadata") + + for i in range(0, 300): + try: + data_key = '/'.join([JOBS_PREFIX, runtime_name + '.meta']) + json_str = self.internal_storage.get_data(key=data_key) + runtime_meta = json.loads(json_str.decode("ascii")) + self.internal_storage.del_data(key=data_key) + break + except Exception: + time.sleep(2) + + if not runtime_meta or 'preinstalls' not in runtime_meta: + raise Exception(f'Failed getting runtime metadata: {runtime_meta}') + + return runtime_meta + + def get_runtime_key(self, one_image_name, runtime_memory, version=__version__): + """ + Method that creates and returns the runtime key. + Runtime keys are used to uniquely identify runtimes within the storage, + in order to know which runtimes are installed and which not. + """ + jobdef_name = self._format_job_name(one_image_name, 256, version) + runtime_key = os.path.join(self.name, version, jobdef_name) + + return runtime_key + + def get_runtime_info(self): + """ + Method that returns all the relevant information about the runtime set + in config + """ + if 'runtime' not in self.one_config or self.one_config['runtime'] == 'default': + py_version = utils.CURRENT_PY_VERSION.replace('.', '') + self.one_config['runtime'] = f'one-runtime-v{py_version}' + + runtime_info = { + 'runtime_name': self.one_config['runtime'], + 'runtime_memory': self.one_config['runtime_memory'], + 'runtime_timeout': self.one_config['runtime_timeout'], + 'max_workers': self.one_config['max_workers'], + } + + return runtime_info