Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 138 additions & 15 deletions veadk/cloud/cloud_agent_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,77 @@


class CloudAgentEngine(BaseModel):
"""Manages cloud agent deployment and operations on Volcengine FaaS platform.

This class handles authentication with Volcengine, deploys local projects to FaaS,
updates function code, removes applications, and supports local testing.

Attributes:
volcengine_access_key (str): Access key for Volcengine authentication.
Defaults to VOLCENGINE_ACCESS_KEY environment variable.
volcengine_secret_key (str): Secret key for Volcengine authentication.
Defaults to VOLCENGINE_SECRET_KEY environment variable.
region (str): Region for Volcengine services. Defaults to "cn-beijing".
_vefaas_service (VeFaaS): Internal VeFaaS client instance, initialized post-creation.

Note:
Credentials must be set via environment variables for default behavior.
This class performs interactive confirmations for destructive operations like removal.

Examples:
```python
from veadk.cloud.cloud_agent_engine import CloudAgentEngine
engine = CloudAgentEngine()
app = engine.deploy("test-app", "/path/to/local/project")
print(app.vefaas_endpoint)
```
"""

volcengine_access_key: str = getenv("VOLCENGINE_ACCESS_KEY")
volcengine_secret_key: str = getenv("VOLCENGINE_SECRET_KEY")
region: str = "cn-beijing"

def model_post_init(self, context: Any, /) -> None:
"""Initializes the internal VeFaaS service after Pydantic model validation.

Creates a VeFaaS instance using the configured access key, secret key, and region.

Args:
self: The CloudAgentEngine instance.
context: Pydantic post-init context parameter (not used).

Returns:
None

Note:
This is a Pydantic lifecycle method, ensuring service readiness after init.
"""
self._vefaas_service = VeFaaS(
access_key=self.volcengine_access_key,
secret_key=self.volcengine_secret_key,
region=self.region,
)

def _prepare(self, path: str, name: str):
"""Prepares the local project for deployment by validating path and name.

Checks if the path exists and is a directory, validates application name format.

Args:
path (str): Full or relative path to the local agent project directory.
name (str): Intended VeFaaS application name.

Returns:
None

Raises:
AssertionError: If path does not exist or is not a directory.
ValueError: If name contains invalid characters like underscores.

Note:
Includes commented code for handling requirements.txt; not executed currently.
Called internally by deploy and update methods.
"""
# basic check
assert os.path.exists(path), f"Local agent project path `{path}` not exists."
assert os.path.isdir(path), (
Expand Down Expand Up @@ -73,10 +132,23 @@ def _prepare(self, path: str, name: str):
# )

def _try_launch_fastapi_server(self, path: str):
"""Try to launch a fastapi server for tests according to user's configuration.
"""Tries to start a FastAPI server locally for testing deployment readiness.

Runs the project's run.sh script and checks connectivity on port 8000.

Args:
path (str): Local agent project path.
path (str): Path to the local project containing run.sh.

Returns:
None

Raises:
RuntimeError: If server startup times out after 30 seconds.

Note:
Sets _FAAS_FUNC_TIMEOUT environment to 900 seconds.
Streams output to console and terminates process after successful check.
Assumes run.sh launches server on 0.0.0.0:8000.
"""
RUN_SH = f"{path}/run.sh"

Expand Down Expand Up @@ -128,19 +200,34 @@ def deploy(
use_adk_web: bool = False,
local_test: bool = False,
) -> CloudApp:
"""Deploy local agent project to Volcengine FaaS platform.
"""Deploys a local agent project to Volcengine FaaS, creating necessary resources.

Prepares project, optionally tests locally, deploys via VeFaaS, and returns app instance.

Args:
application_name (str): Expected VeFaaS application name.
path (str): Local agent project path.
gateway_name (str): Gateway name.
gateway_service_name (str): Gateway service name.
gateway_upstream_name (str): Gateway upstream name.
use_adk_web (bool): Whether to use ADK Web.
local_test (bool): Whether to run local test for FastAPI Server.
application_name (str): Unique name for the VeFaaS application.
path (str): Local directory path of the agent project.
gateway_name (str, optional): Custom gateway resource name. Defaults to timestamped.
gateway_service_name (str, optional): Custom service name. Defaults to timestamped.
gateway_upstream_name (str, optional): Custom upstream name. Defaults to timestamped.
use_adk_web (bool): Enable ADK Web configuration. Defaults to False.
local_test (bool): Perform FastAPI server test before deploy. Defaults to False.

Returns:
CloudApp: The deployed cloud application instance.
CloudApp: Deployed application with endpoint, name, and ID.

Raises:
ValueError: On deployment failure, such as invalid config or VeFaaS errors.

Note:
Converts path to absolute; sets telemetry opt-out and ADK Web env vars.
Generates default gateway names if not specified.

Examples:
```python
app = engine.deploy("my-agent", "./agent-project", local_test=True)
print(f"Deployed at: {app.vefaas_endpoint}")
```
"""
# prevent deepeval writing operations
veadk_environments["DEEPEVAL_TELEMETRY_OPT_OUT"] = "YES"
Expand Down Expand Up @@ -185,6 +272,28 @@ def deploy(
)

def remove(self, app_name: str):
"""Deletes a deployed cloud application after user confirmation.

Locates app by name, confirms, and issues delete via VeFaaS.

Args:
app_name (str): Name of the application to remove.

Returns:
None

Raises:
ValueError: If application not found by name.

Note:
Interactive prompt required; cancels on non-'y' input.
Deletion is processed asynchronously by VeFaaS.

Examples:
```python
engine.remove("my-agent")
```
"""
confirm = input(f"Confirm delete cloud app {app_name}? (y/N): ")
if confirm.lower() != "y":
print("Delete cancelled.")
Expand All @@ -202,14 +311,28 @@ def update_function_code(
application_name: str,
path: str,
) -> CloudApp:
"""Update existing agent project code while keeping the same URL.
"""Updates the code in an existing VeFaaS application without changing endpoint.

Prepares new code from local path and updates function via VeFaaS.

Args:
application_name (str): Existing application name to update.
path (str): Local agent project path.
application_name (str): Name of the existing application to update.
path (str): Local path containing updated project files.

Returns:
CloudApp: Updated cloud app with same endpoint.
CloudApp: Updated application instance with same endpoint.

Raises:
ValueError: If update fails due to preparation or VeFaaS issues.

Note:
Preserves gateway and other resources; only function code is updated.
Path is resolved to absolute before processing.

Examples:
```python
updated_app = engine.update_function_code("my-agent", "./updated-project")
```
"""
# convert `path` to absolute path
path = str(Path(path).resolve())
Expand Down
Loading