Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lithops/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@
'aliyun_fc',
'oracle_f',
'k8s',
'singularity'
'singularity',
'one'
]

STANDALONE_BACKENDS = [
Expand Down
3 changes: 3 additions & 0 deletions lithops/serverless/backends/one/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .one import OpenNebulaBackend as ServerlessBackend

__all__ = ['ServerlessBackend']
42 changes: 42 additions & 0 deletions lithops/serverless/backends/one/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#
# (C) Copyright Cloudlab URV 2024
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import enum


@enum.unique
class ServiceState(enum.Enum):
RUNNING = 2
SCALING = 9
COOLDOWN = 10


DEFAULT_CONFIG_KEYS = {
'runtime_timeout': 600, # Default: 10 minutes
'runtime_memory': 512, # Default memory: 512 MB
'max_workers': 100,
'worker_processes': 1,
}


def load_config(config_data):
for key in DEFAULT_CONFIG_KEYS:
if key not in config_data['one']:
config_data['one'][key] = DEFAULT_CONFIG_KEYS[key]

if 'rabbitmq' not in config_data:
raise Exception('RabbitMQ configuration is needed in this backend')
else:
config_data['one']['amqp_url'] = config_data['rabbitmq'].get('amqp_url', False)
144 changes: 144 additions & 0 deletions lithops/serverless/backends/one/entry_point.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
#
# (C) Copyright Cloudlab URV 2024
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import sys
sys.path.insert(0, '/lithops')

import pika
import os
import time
import uuid
import json
import logging
from multiprocessing import Value, cpu_count
from threading import Thread

from lithops.version import __version__
from lithops.utils import setup_lithops_logger, b64str_to_dict, dict_to_b64str
from lithops.worker import function_handler
from lithops.worker.utils import get_runtime_metadata
from lithops.constants import JOBS_PREFIX
from lithops.storage.storage import InternalStorage

logger = logging.getLogger('lithops.worker')


def extract_runtime_meta(payload):
logger.info(f"Lithops v{__version__} - Generating metadata")

runtime_meta = get_runtime_metadata()

internal_storage = InternalStorage(payload)
status_key = '/'.join([JOBS_PREFIX, payload['runtime_name'] + '.meta'])
dmpd_response_status = json.dumps(runtime_meta)
internal_storage.put_data(status_key, dmpd_response_status)
logger.info(f"Runtime metadata key {status_key}")


def run_job_k8s_rabbitmq(payload):
logger.info(f"Lithops v{__version__} - Starting OpenNebula worker execution")

act_id = str(uuid.uuid4()).replace('-', '')[:12]
os.environ['__LITHOPS_ACTIVATION_ID'] = act_id
os.environ['__LITHOPS_BACKEND'] = 'one'

function_handler(payload)
with running_jobs.get_lock():
running_jobs.value += len(payload['call_ids'])

logger.info("Finishing OpenNebula worker execution")


def manage_work_queue(ch, method, payload):
"""Callback to receive the payload and run the jobs"""
logger.info("Call from lithops received.")

message = payload
tasks = message['total_calls']

# If there are more tasks than cpus in the pod, we need to send a new message
if tasks <= running_jobs.value:
processes_to_start = tasks
else:
if running_jobs.value == 0:
logger.info("All cpus are busy. Waiting for a cpu to be free")
ch.basic_nack(delivery_tag=method.delivery_tag)
time.sleep(0.5)
return

processes_to_start = running_jobs.value

message_to_send = message.copy()
message_to_send['total_calls'] = tasks - running_jobs.value
message_to_send['call_ids'] = message_to_send['call_ids'][running_jobs.value:]
message_to_send['data_byte_ranges'] = message_to_send['data_byte_ranges'][running_jobs.value:]
message_to_send = {'action': 'send_task', 'payload': dict_to_b64str(message_to_send)}
message['call_ids'] = message['call_ids'][:running_jobs.value]
message['data_byte_ranges'] = message['data_byte_ranges'][:running_jobs.value]

ch.basic_publish(
exchange='',
routing_key='task_queue',
body=json.dumps(message_to_send),
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
))

logger.info(f"Starting {processes_to_start} processes")

message['worker_processes'] = running_jobs.value
with running_jobs.get_lock():
running_jobs.value -= processes_to_start

Thread(target=run_job_k8s_rabbitmq, args=([message])).start()

ch.basic_ack(delivery_tag=method.delivery_tag)


def actions_switcher(ch, method, properties, body):
message = json.loads(body)
action = message['action']
encoded_payload = message['payload']

payload = b64str_to_dict(encoded_payload)
setup_lithops_logger(payload.get('log_level', 'INFO'))

logger.info(f"Action {action} received from lithops.")

if action == 'get_metadata':
extract_runtime_meta(payload)
ch.basic_ack(delivery_tag=method.delivery_tag)

elif action == 'send_task':
manage_work_queue(ch, method, payload)


if __name__ == '__main__':
# Shared variable to track completed jobs
running_jobs = Value('i', cpu_count())

# Connect to rabbitmq
params = pika.URLParameters(sys.argv[1])
connection = pika.BlockingConnection(params)
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
channel.basic_qos(prefetch_count=1)

# Start listening to the new job
channel.basic_consume(queue='task_queue', on_message_callback=actions_switcher)

logger.info("Listening to rabbitmq...")
channel.start_consuming()
68 changes: 68 additions & 0 deletions lithops/serverless/backends/one/gate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import os

import requests


class OneGateError(Exception):
"""General exception for OneGate-related errors."""

def __init__(self, message, status_code=None):
super().__init__(message)
self.status_code = status_code


class OneGateClient:
def __init__(self):
self.endpoint = os.getenv(
"ONEGATE_ENDPOINT", self.get_config("ONEGATE_ENDPOINT")
)
self.token = self.read_file("/mnt/context/token.txt")
self.vm_id = self.get_config("VMID")

@staticmethod
def read_file(filepath):
with open(filepath, "r") as file:
return file.read().strip()

@staticmethod
def get_config(param, filepath="/mnt/context/context.sh"):
with open(filepath, "r") as file:
for line in file:
if line.startswith(f"{param}="):
return line.split("=", 1)[1].strip().strip("'\"")
return None

def get(self, path):
"""
Make a GET request to OneGate API and return the JSON response.
"""
url = f"{self.endpoint}/{path}"
headers = {"X-ONEGATE-TOKEN": self.token, "X-ONEGATE-VMID": self.vm_id}

try:
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
status_code = e.response.status_code if e.response else None
raise OneGateError(f"GET request to {url} failed: {e}", status_code)
except ValueError as e:
raise OneGateError(f"Failed to parse JSON response: {e}")

def scale(self, cardinality, role="worker"):
"""
Make a PUT request to OneGate API.
"""
url = f"{self.endpoint}/service/role/{role}"
headers = {
"X-ONEGATE-TOKEN": self.token,
"X-ONEGATE-VMID": self.vm_id,
"Content-Type": "application/json",
}
data = {"cardinality": cardinality}
try:
response = requests.put(url, headers=headers, json=data)
response.raise_for_status()
except requests.exceptions.RequestException as e:
status_code = e.response.status_code if e.response else None
raise OneGateError(f"PUT request to {url} failed: {e}", status_code)
70 changes: 70 additions & 0 deletions lithops/serverless/backends/one/lithops_client.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#!/bin/bash

install_python3_venv() {
echo "=== Installing python3-venv ==="
apt-get update && apt-get install -y python3-venv
}

mount_onegate() {
mkdir /mnt/context
mount /dev/cdrom /mnt/context
}

clone_lithops_repository() {
echo "=== Cloning the Lithops repository into /lithops ==="
git clone https://github.com/OpenNebula/lithops.git /lithops
git checkout f-748
}

create_virtualenv() {
echo "=== Creating a Python virtual environment in /lithops-venv ==="
python3 -m venv /lithops-venv
source /lithops-venv/bin/activate
}

install_lithops() {
echo "=== Installing Lithops and dependencies ==="
cd /lithops
python3 setup.py install
pip install lithops[aws]
pip install --no-deps -e .
}

setup_configuration() {
echo "=== Setting up Lithops configuration ==="
mkdir -p /etc/lithops
cat <<EOF > /etc/lithops/config
lithops:
monitoring: rabbitmq
backend: one
storage: aws_s3

rabbitmq:
amqp_url: EDIT_ME

one:
worker_processes: 2
runtime_memory: 512
runtime_timeout: 600
runtime_cpu: 2
amqp_url: EDIT_ME
max_workers: 3
min_workers: 1
autoscale: none

aws:
region: EDIT_ME
access_key_id: EDIT_ME
secret_access_key: EDIT_ME
EOF
}

# Main
mount_onegate
install_python3_venv
clone_lithops_repository
create_virtualenv
install_lithops
setup_configuration

echo "=== Lithops setup and configuration completed successfully ==="
33 changes: 33 additions & 0 deletions lithops/serverless/backends/one/lithops_serivce.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"name": "Custom_Lithops_Service",
"deployment": "straight",
"description": "",
"roles": [
{
"name": "lithops_client",
"cardinality": 1,
"vm_template": EDIT_ME,
"vm_template_contents": "NIC = [\n NAME = \"_NIC0\",\n NETWORK_ID = \"$Public\" ]\n ",
"min_vms": 1,
"max_vms": 1,
"elasticity_policies": [],
"scheduled_policies": [],
"cooldown": 5
},
{
"name": "lithops_worker",
"cardinality": 1,
"vm_template": EDIT_ME,
"vm_template_contents": "NIC = [\n NAME = \"_NIC0\",\n NETWORK_ID = \"$Public\" ]\n ",
"elasticity_policies": [],
"scheduled_policies": [],
"cooldown": 5
}
],
"ready_status_gate": false,
"automatic_deletion": false,
"networks": {
"Public": "M|network|Public| |id:"
},
"registration_time": 1742379547
}
Loading